diff options
Diffstat (limited to 'storage/src')
27 files changed, 2281 insertions, 750 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 4ec49b5c6f8..97fccf58901 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -1116,7 +1116,7 @@ TEST_F(BucketDBUpdaterTest, notify_bucket_change_from_node_down) { getBucketDBUpdater().onNotifyBucketChange(cmd); } // Enable here to avoid having request bucket info be silently swallowed - // (sendRequestBucketInfo drops message if node is down). + // (send_request_bucket_info drops message if node is down). enableDistributorClusterState("distributor:1 storage:2 .0.s:d"); ASSERT_EQ(std::string("BucketId(0x4000000000000001) : " diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 4c574609df5..7958306db5f 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -239,6 +239,7 @@ DistributorTest::DistributorTest() DistributorTest::~DistributorTest() = default; +// TODO -> stripe test TEST_F(DistributorTest, operation_generation) { setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); @@ -257,6 +258,7 @@ TEST_F(DistributorTest, operation_generation) { EXPECT_EQ("Visitor Create", testOp(cmd)); } +// TODO -> stripe test TEST_F(DistributorTest, operations_generated_and_started_without_duplicates) { setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); @@ -271,6 +273,8 @@ TEST_F(DistributorTest, operations_generated_and_started_without_duplicates) { ASSERT_EQ(6, _sender.commands().size()); } +// TODO -> stripe test +// TODO also need to impl/test cross-stripe cluster state changes TEST_F(DistributorTest, recovery_mode_on_cluster_state_change) { setupDistributor(Redundancy(1), NodeCount(2), "storage:1 .0.s:d distributor:1"); @@ -291,6 +295,8 @@ TEST_F(DistributorTest, recovery_mode_on_cluster_state_change) { EXPECT_TRUE(_distributor->isInRecoveryMode()); } +// TODO -> stripe test +// TODO how to throttle across stripes? TEST_F(DistributorTest, operations_are_throttled) { setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); getConfig().setMinPendingMaintenanceOps(1); @@ -303,6 +309,7 @@ TEST_F(DistributorTest, operations_are_throttled) { ASSERT_EQ(1, _sender.commands().size()); } +// TODO -> stripe test TEST_F(DistributorTest, handle_unknown_maintenance_reply) { setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); @@ -322,6 +329,7 @@ TEST_F(DistributorTest, handle_unknown_maintenance_reply) { } } +// TODO -> generic, non distr/stripe test TEST_F(DistributorTest, contains_time_statement) { setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); @@ -333,6 +341,7 @@ TEST_F(DistributorTest, contains_time_statement) { EXPECT_TRUE(getConfig().containsTimeStatement("testdoctype1.headerfield == now() - 3600")); } +// TODO -> stripe test TEST_F(DistributorTest, update_bucket_database) { enableDistributorClusterState("distributor:1 storage:3"); @@ -402,6 +411,8 @@ public: } +// TODO -> stripe test +// TODO need to impl/test cross-stripe status requests TEST_F(DistributorTest, tick_processes_status_requests) { setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); @@ -430,6 +441,8 @@ TEST_F(DistributorTest, tick_processes_status_requests) { EXPECT_THAT(thread.getResult(), HasSubstr("BucketId(0x4000000000000001)")); } +// TODO -> distributor test since it owns metric hook +// TODO need to impl/test cross-stripe metrics aggregation TEST_F(DistributorTest, metric_update_hook_updates_pending_maintenance_metrics) { setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); // To ensure we count all operations, not just those fitting within the @@ -484,6 +497,7 @@ uint64_t db_sample_interval_sec(const Distributor& d) noexcept { } +// TODO -> stripe test TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_time_intervals) { getClock().setAbsoluteTimeInSeconds(1000); @@ -521,6 +535,8 @@ TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_tim EXPECT_GT(now_used, last_used); } +// TODO -> stripe test +// TODO need to impl/test cross-stripe config propagation TEST_F(DistributorTest, priority_config_is_propagated_to_distributor_configuration) { using namespace vespa::config::content::core; @@ -557,6 +573,7 @@ TEST_F(DistributorTest, priority_config_is_propagated_to_distributor_configurati EXPECT_EQ(12, static_cast<int>(mp.mergeGlobalBuckets)); } +// TODO -> stripe test TEST_F(DistributorTest, no_db_resurrection_for_bucket_not_owned_in_pending_state) { setupDistributor(Redundancy(1), NodeCount(10), "storage:2 distributor:2"); lib::ClusterState newState("storage:10 distributor:10"); @@ -578,6 +595,7 @@ TEST_F(DistributorTest, no_db_resurrection_for_bucket_not_owned_in_pending_state EXPECT_EQ("NONEXISTING", dumpBucket(nonOwnedBucket)); } +// TODO -> stripe test TEST_F(DistributorTest, added_db_buckets_without_gc_timestamp_implicitly_get_current_time) { setupDistributor(Redundancy(1), NodeCount(10), "storage:2 distributor:2"); getClock().setAbsoluteTimeInSeconds(101234); @@ -591,6 +609,7 @@ TEST_F(DistributorTest, added_db_buckets_without_gc_timestamp_implicitly_get_cur EXPECT_EQ(101234, e->getLastGarbageCollectionTime()); } +// TODO -> stripe test TEST_F(DistributorTest, merge_stats_are_accumulated_during_database_iteration) { setupDistributor(Redundancy(2), NodeCount(3), "storage:3 distributor:1"); // Copies out of sync. Not possible for distributor to _reliably_ tell @@ -662,6 +681,7 @@ DistributorTest::assertBucketSpaceStats(size_t expBucketPending, size_t expBucke * their state checkers at all, we won't get any statistics from any other * operations for the bucket. */ +// TODO -> stripe test TEST_F(DistributorTest, stats_generated_for_preempted_operations) { setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); // For this test it suffices to have a single bucket with multiple aspects @@ -686,6 +706,7 @@ TEST_F(DistributorTest, stats_generated_for_preempted_operations) { } } +// TODO -> distributor test TEST_F(DistributorTest, host_info_reporter_config_is_propagated_to_reporter) { setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); @@ -699,11 +720,13 @@ TEST_F(DistributorTest, host_info_reporter_config_is_propagated_to_reporter) { EXPECT_FALSE(distributor_host_info_reporter().isReportingEnabled()); } +// TODO -> stripe test (though config is a bit of a special case...) TEST_F(DistributorTest, replica_counting_mode_is_configured_to_trusted_by_default) { setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); EXPECT_EQ(ConfigBuilder::MinimumReplicaCountingMode::TRUSTED, currentReplicaCountingMode()); } +// TODO -> stripe test TEST_F(DistributorTest, replica_counting_mode_config_is_propagated_to_metric_updater) { setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); ConfigBuilder builder; @@ -712,6 +735,7 @@ TEST_F(DistributorTest, replica_counting_mode_config_is_propagated_to_metric_upd EXPECT_EQ(ConfigBuilder::MinimumReplicaCountingMode::ANY, currentReplicaCountingMode()); } +// TODO -> stripe test TEST_F(DistributorTest, max_consecutively_inhibited_maintenance_ticks_config_is_propagated_to_internal_config) { setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); ConfigBuilder builder; @@ -720,11 +744,13 @@ TEST_F(DistributorTest, max_consecutively_inhibited_maintenance_ticks_config_is_ EXPECT_EQ(getConfig().max_consecutively_inhibited_maintenance_ticks(), 123); } +// TODO -> stripe test TEST_F(DistributorTest, bucket_activation_is_enabled_by_default) { setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); EXPECT_FALSE(getConfig().isBucketActivationDisabled()); } +// TODO -> stripe test TEST_F(DistributorTest, bucket_activation_config_is_propagated_to_distributor_configuration) { using namespace vespa::config::content::core; @@ -747,6 +773,7 @@ DistributorTest::configureMaxClusterClockSkew(int seconds) { _distributor->enableNextConfig(); } +// TODO -> stripe test TEST_F(DistributorTest, max_clock_skew_config_is_propagated_to_distributor_config) { setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); @@ -817,6 +844,7 @@ void DistributorTest::assertNoMessageBounced() { // TODO refactor this to set proper highest timestamp as part of bucket info // reply once we have the "highest timestamp across all owned buckets" feature // in place. +// TODO where does this truly belong? TEST_F(DistributorTest, configured_safe_time_point_rejection_works_end_to_end) { setupDistributor(Redundancy(2), NodeCount(2), "bits:1 storage:1 distributor:2"); @@ -846,6 +874,7 @@ void DistributorTest::configure_mutation_sequencing(bool enabled) { _distributor->enableNextConfig(); } +// TODO -> stripe test TEST_F(DistributorTest, sequencing_config_is_propagated_to_distributor_config) { setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); @@ -871,6 +900,7 @@ DistributorTest::configure_merge_busy_inhibit_duration(int seconds) { _distributor->enableNextConfig(); } +// TODO -> stripe test TEST_F(DistributorTest, merge_busy_inhibit_duration_config_is_propagated_to_distributor_config) { setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); @@ -878,6 +908,7 @@ TEST_F(DistributorTest, merge_busy_inhibit_duration_config_is_propagated_to_dist EXPECT_EQ(getConfig().getInhibitMergesOnBusyNodeDuration(), std::chrono::seconds(7)); } +// TODO -> stripe test TEST_F(DistributorTest, merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker) { setupDistributor(Redundancy(2), NodeCount(2), "storage:1 distributor:1"); addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t"); @@ -903,6 +934,7 @@ TEST_F(DistributorTest, merge_busy_inhibit_duration_is_propagated_to_pending_mes EXPECT_FALSE(node_info.isBusy(0)); } +// TODO -> stripe test TEST_F(DistributorTest, external_client_requests_are_handled_individually_in_priority_order) { setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t/a"); @@ -931,6 +963,7 @@ TEST_F(DistributorTest, external_client_requests_are_handled_individually_in_pri EXPECT_THAT(actual, ContainerEq(expected)); } +// TODO -> stripe test TEST_F(DistributorTest, internal_messages_are_started_in_fifo_order_batch) { // To test internal request ordering, we use NotifyBucketChangeCommand // for the reason that it explicitly updates the bucket database for @@ -959,6 +992,8 @@ TEST_F(DistributorTest, internal_messages_are_started_in_fifo_order_batch) { EXPECT_EQ(api::BucketInfo(1, 1, 1), e.getBucketInfo().getNode(0)->getBucketInfo()); } +// TODO -> stripe test +// TODO also test that closing distributor closes stripes TEST_F(DistributorTest, closing_aborts_priority_queued_client_requests) { setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); document::BucketId bucket(16, 1); @@ -997,6 +1032,9 @@ void assert_invalid_stats_for_all_spaces( } +// TODO -> stripe test +// TODO must impl/test cross-stripe bucket space stats +// TODO cross-stripe recovery mode handling how? TEST_F(DistributorTest, entering_recovery_mode_resets_bucket_space_stats) { // Set up a cluster state + DB contents which implies merge maintenance ops setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2"); @@ -1018,6 +1056,7 @@ TEST_F(DistributorTest, entering_recovery_mode_resets_bucket_space_stats) { assert_invalid_stats_for_all_spaces(stats, 2); } +// TODO figure out interaction between stripes and distributors on this one TEST_F(DistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_replies) { setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2"); // Should not send explicit replies during init stage diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index 7929cc1c906..8a0e18c86af 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -66,7 +66,7 @@ DistributorTestUtil::setup_distributor(int redundancy, // This is for all intents and purposes a hack to avoid having the // distributor treat setting the distribution explicitly as a signal that // it should send RequestBucketInfo to all configured nodes. - // If we called storageDistributionChanged followed by enableDistribution + // If we called storage_distribution_changed followed by enableDistribution // explicitly (which is what happens in "real life"), that is what would // take place. // The inverse case of this can be explicitly accomplished by calling @@ -338,7 +338,7 @@ DistributorTestUtil::disableBucketActivationInConfig(bool disable) getConfig().configure(config); } -BucketDBUpdater& +StripeBucketDBUpdater& DistributorTestUtil::getBucketDBUpdater() { return _distributor->bucket_db_updater(); } diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index d3c0445d5b5..ddf153a1406 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -17,7 +17,7 @@ namespace framework { struct TickingThreadPool; } namespace distributor { -class BucketDBUpdater; +class StripeBucketDBUpdater; class Distributor; class DistributorBucketSpace; class DistributorBucketSpaceRepo; @@ -112,7 +112,7 @@ public: int idx = -1, api::ReturnCode::Result result = api::ReturnCode::OK); - BucketDBUpdater& getBucketDBUpdater(); + StripeBucketDBUpdater& getBucketDBUpdater(); IdealStateManager& getIdealStateManager(); ExternalOperationHandler& getExternalOperationHandler(); storage::distributor::DistributorStripeComponent& distributor_component(); diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index ea963b227e2..53f2e7c8413 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -31,6 +31,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _maxPendingMaintenanceOps(1000), _maxVisitorsPerNodePerClientVisitor(4), _minBucketsPerVisitor(5), + _num_distributor_stripes(0), _maxClusterClockSkew(0), _inhibitMergeSendingOnBusyNodeDuration(60s), _simulated_db_pruning_latency(0), @@ -181,6 +182,8 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist } _simulated_db_pruning_latency = std::chrono::milliseconds(std::max(0, config.simulatedDbPruningLatencyMsec)); _simulated_db_merging_latency = std::chrono::milliseconds(std::max(0, config.simulatedDbMergingLatencyMsec)); + + _num_distributor_stripes = std::max(0, config.numDistributorStripes); // TODO STRIPE test LOG(debug, "Distributor now using new configuration parameters. Split limits: %d docs/%d bytes. " diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index 9c1456fa9fd..479298ff082 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -265,6 +265,8 @@ public: return _enable_revert; } + uint32_t num_distributor_stripes() const noexcept { return _num_distributor_stripes; } + bool containsTimeStatement(const std::string& documentSelection) const; private: @@ -297,6 +299,8 @@ private: uint32_t _maxVisitorsPerNodePerClientVisitor; uint32_t _minBucketsPerVisitor; + uint32_t _num_distributor_stripes; + MaintenancePriorities _maintenancePriorities; std::chrono::seconds _maxClusterClockSkew; std::chrono::seconds _inhibitMergeSendingOnBusyNodeDuration; diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt index d82d14831f4..57d6a23c79f 100644 --- a/storage/src/vespa/storage/distributor/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/CMakeLists.txt @@ -4,6 +4,7 @@ vespa_add_library(storage_distributor activecopy.cpp blockingoperationstarter.cpp bucket_db_prune_elision.cpp + bucket_space_distribution_configs.cpp bucket_space_distribution_context.cpp bucketdbupdater.cpp bucketgctimecalculator.cpp @@ -23,6 +24,7 @@ vespa_add_library(storage_distributor ideal_service_layer_nodes_bundle.cpp idealstatemanager.cpp idealstatemetricsset.cpp + legacy_single_stripe_accessor.cpp messagetracker.cpp nodeinfo.cpp operation_routing_snapshot.cpp @@ -40,6 +42,7 @@ vespa_add_library(storage_distributor statechecker.cpp statecheckers.cpp statusreporterdelegate.cpp + stripe_bucket_db_updater.cpp throttlingoperationstarter.cpp update_metric_set.cpp visitormetricsset.cpp diff --git a/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp b/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp new file mode 100644 index 00000000000..da5769411d4 --- /dev/null +++ b/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp @@ -0,0 +1,17 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "bucket_space_distribution_configs.h" +#include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/storage/common/global_bucket_space_distribution_converter.h> +#include <vespa/vdslib/distribution/distribution.h> + +namespace storage::distributor { + +BucketSpaceDistributionConfigs +BucketSpaceDistributionConfigs::from_default_distribution(std::shared_ptr<const lib::Distribution> distribution) { + BucketSpaceDistributionConfigs ret; + ret.space_configs.emplace(document::FixedBucketSpaces::global_space(), GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution)); + ret.space_configs.emplace(document::FixedBucketSpaces::default_space(), std::move(distribution)); + return ret; +} + +} diff --git a/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h b/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h new file mode 100644 index 00000000000..9ebd8ef9732 --- /dev/null +++ b/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h @@ -0,0 +1,27 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/document/bucket/bucketspace.h> +#include <map> +#include <memory> + +namespace storage::lib { class Distribution; } + +namespace storage::distributor { + +/** + * Represents a complete mapping of all known bucket spaces to their appropriate, + * (possibly derived) distribution config. + */ +struct BucketSpaceDistributionConfigs { + std::map<document::BucketSpace, std::shared_ptr<const lib::Distribution>> space_configs; + + std::shared_ptr<const lib::Distribution> get_or_nullptr(document::BucketSpace space) const noexcept { + auto iter = space_configs.find(space); + return (iter != space_configs.end()) ? iter->second : std::shared_ptr<const lib::Distribution>(); + } + + static BucketSpaceDistributionConfigs from_default_distribution(std::shared_ptr<const lib::Distribution>); +}; + +} diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index 6735ea1e533..6dcb5ba732e 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -2,12 +2,15 @@ #include "bucketdbupdater.h" #include "bucket_db_prune_elision.h" +#include "bucket_space_distribution_configs.h" #include "bucket_space_distribution_context.h" #include "distributor.h" #include "distributor_bucket_space.h" #include "distributormetricsset.h" #include "simpleclusterinformation.h" +#include "stripe_access_guard.h" #include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/storage/common/global_bucket_space_distribution_converter.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/removelocation.h> #include <vespa/vdslib/distribution/distribution.h> @@ -24,74 +27,71 @@ using document::BucketSpace; namespace storage::distributor { -BucketDBUpdater::BucketDBUpdater(DistributorStripeInterface& owner, - DistributorBucketSpaceRepo& bucketSpaceRepo, - DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, +BucketDBUpdater::BucketDBUpdater(DistributorStripeInterface& owner, // FIXME STRIPE! DistributorMessageSender& sender, - DistributorComponentRegister& compReg) - : framework::StatusReporter("bucketdb", "Bucket DB Updater"), - _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"), - _node_ctx(_distributorComponent), - _op_ctx(_distributorComponent), - _distributor_interface(_distributorComponent.getDistributor()), - _delayedRequests(), - _sentMessages(), - _pendingClusterState(), + DistributorComponentRegister& comp_reg, + StripeAccessor& stripe_accessor) + : framework::StatusReporter("temp_bucketdb", "Bucket DB Updater"), // TODO STRIPE rename once duplication is removed + _stripe_accessor(stripe_accessor), + _active_state_bundle(lib::ClusterState()), + _dummy_mutable_bucket_space_repo(std::make_unique<DistributorBucketSpaceRepo>(owner.getDistributorIndex())), + _dummy_read_only_bucket_space_repo(std::make_unique<DistributorBucketSpaceRepo>(owner.getDistributorIndex())), + _distributor_component(owner, *_dummy_mutable_bucket_space_repo, *_dummy_read_only_bucket_space_repo, comp_reg, "Bucket DB Updater"), + _node_ctx(_distributor_component), + _op_ctx(_distributor_component), + _distributor_interface(_distributor_component.getDistributor()), + _delayed_requests(), + _sent_messages(), + _pending_cluster_state(), _history(), _sender(sender), - _enqueuedRechecks(), - _outdatedNodesMap(), - _transitionTimer(_node_ctx.clock()), - _stale_reads_enabled(false), - _active_distribution_contexts(), - _explicit_transition_read_guard(), - _distribution_context_mutex() + _enqueued_rechecks(), + _outdated_nodes_map(), + _transition_timer(_node_ctx.clock()), + _stale_reads_enabled(false) { - for (auto& elem : _op_ctx.bucket_space_repo()) { - _active_distribution_contexts.emplace( - elem.first, - BucketSpaceDistributionContext::make_not_yet_initialized(_node_ctx.node_index())); - _explicit_transition_read_guard.emplace(elem.first, std::shared_ptr<BucketDatabase::ReadGuard>()); - } + // FIXME STRIPE top-level Distributor needs a proper way to track the current cluster state bundle! + propagate_active_state_bundle_internally(); + bootstrap_distribution_config(_distributor_component.getDistribution()); } BucketDBUpdater::~BucketDBUpdater() = default; -OperationRoutingSnapshot BucketDBUpdater::read_snapshot_for_bucket(const document::Bucket& bucket) const { - const auto bucket_space = bucket.getBucketSpace(); - std::lock_guard lock(_distribution_context_mutex); - auto active_state_iter = _active_distribution_contexts.find(bucket_space); - assert(active_state_iter != _active_distribution_contexts.cend()); - auto& state = *active_state_iter->second; - if (!state.bucket_owned_in_active_state(bucket.getBucketId())) { - return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second); +void +BucketDBUpdater::propagate_active_state_bundle_internally() { + for (auto* repo : {_dummy_mutable_bucket_space_repo.get(), _dummy_read_only_bucket_space_repo.get()}) { + for (auto& iter : *repo) { + iter.second->setClusterState(_active_state_bundle.getDerivedClusterState(iter.first)); + } } - const bool bucket_present_in_mutable_db = state.bucket_owned_in_pending_state(bucket.getBucketId()); - if (!bucket_present_in_mutable_db && !stale_reads_enabled()) { - return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second); +} + +void +BucketDBUpdater::bootstrap_distribution_config(std::shared_ptr<const lib::Distribution> distribution) { + auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution); + for (auto* repo : {_dummy_mutable_bucket_space_repo.get(), _dummy_read_only_bucket_space_repo.get()}) { + repo->get(document::FixedBucketSpaces::default_space()).setDistribution(distribution); + repo->get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr); } - const auto& space_repo = bucket_present_in_mutable_db - ? _op_ctx.bucket_space_repo() - : _op_ctx.read_only_bucket_space_repo(); - auto existing_guard_iter = _explicit_transition_read_guard.find(bucket_space); - assert(existing_guard_iter != _explicit_transition_read_guard.cend()); - auto db_guard = existing_guard_iter->second - ? existing_guard_iter-> second - : space_repo.get(bucket_space).getBucketDatabase().acquire_read_guard(); - return OperationRoutingSnapshot::make_routable_with_guard(active_state_iter->second, std::move(db_guard), space_repo); + // TODO STRIPE do we need to bootstrap the stripes as well here? Or do they do this on their own volition? + // ... need to take a guard if so, so can probably not be done at ctor time..? } +// TODO STRIPE what to do with merge guards... +// FIXME what about bucket DB replica update timestamp allocations?! Replace with u64 counter..? +// Must at the very least ensure we use stripe-local TS generation for DB inserts...! i.e. no global TS +// Or do we have to touch these at all here? Just defer all this via stripe interface? void BucketDBUpdater::flush() { - for (auto & entry : _sentMessages) { + for (auto & entry : _sent_messages) { // Cannot sendDown MergeBucketReplies during flushing, since // all lower links have been closed if (entry.second._mergeReplyGuard) { entry.second._mergeReplyGuard->resetReply(); } } - _sentMessages.clear(); + _sent_messages.clear(); } void @@ -102,26 +102,19 @@ BucketDBUpdater::print(std::ostream& out, bool verbose, const std::string& inden } bool -BucketDBUpdater::shouldDeferStateEnabling() const noexcept +BucketDBUpdater::should_defer_state_enabling() const noexcept { return stale_reads_enabled(); } bool -BucketDBUpdater::hasPendingClusterState() const +BucketDBUpdater::has_pending_cluster_state() const { - return static_cast<bool>(_pendingClusterState); -} - -const lib::ClusterState* -BucketDBUpdater::pendingClusterStateOrNull(const document::BucketSpace& space) const { - return (hasPendingClusterState() - ? _pendingClusterState->getNewClusterStateBundle().getDerivedClusterState(space).get() - : nullptr); + return static_cast<bool>(_pending_cluster_state); } void -BucketDBUpdater::sendRequestBucketInfo( +BucketDBUpdater::send_request_bucket_info( uint16_t node, const document::Bucket& bucket, const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard) @@ -135,116 +128,50 @@ BucketDBUpdater::sendRequestBucketInfo( auto msg = std::make_shared<api::RequestBucketInfoCommand>(bucket.getBucketSpace(), buckets); - LOG(debug, - "Sending request bucket info command %" PRIu64 " for " - "bucket %s to node %u", - msg->getMsgId(), - bucket.toString().c_str(), - node); + LOG(debug, "Sending request bucket info command %" PRIu64 " for bucket %s to node %u", + msg->getMsgId(), bucket.toString().c_str(), node); msg->setPriority(50); msg->setAddress(_node_ctx.node_address(node)); - _sentMessages[msg->getMsgId()] = + _sent_messages[msg->getMsgId()] = BucketRequest(node, _op_ctx.generate_unique_timestamp(), bucket, mergeReplyGuard); _sender.sendCommand(msg); } void -BucketDBUpdater::recheckBucketInfo(uint32_t nodeIdx, - const document::Bucket& bucket) -{ - sendRequestBucketInfo(nodeIdx, bucket, std::shared_ptr<MergeReplyGuard>()); -} - -namespace { - -class ReadOnlyDbMergingInserter : public BucketDatabase::MergingProcessor { - using NewEntries = std::vector<BucketDatabase::Entry>; - NewEntries::const_iterator _current; - const NewEntries::const_iterator _last; -public: - explicit ReadOnlyDbMergingInserter(const NewEntries& new_entries) - : _current(new_entries.cbegin()), - _last(new_entries.cend()) - {} - - Result merge(BucketDatabase::Merger& m) override { - const uint64_t key_to_insert = m.bucket_key(); - uint64_t key_at_cursor = 0; - while (_current != _last) { - key_at_cursor = _current->getBucketId().toKey(); - if (key_at_cursor >= key_to_insert) { - break; - } - m.insert_before_current(_current->getBucketId(), *_current); - ++_current; - } - if ((_current != _last) && (key_at_cursor == key_to_insert)) { - // If we encounter a bucket that already exists, replace value wholesale. - // Don't try to cleverly merge replicas, as the values we currently hold - // in the read-only DB may be stale. - // Note that this case shouldn't really happen, since we only add previously - // owned buckets to the read-only DB, and subsequent adds to a non-empty DB - // can only happen for state preemptions. Since ownership is not regained - // before a state is stable, a bucket is only added once. But we handle it - // anyway in case this changes at some point in the future. - m.current_entry() = *_current; - return Result::Update; - } - return Result::KeepUnchanged; - } - - void insert_remaining_at_end(BucketDatabase::TrailingInserter& inserter) override { - for (; _current != _last; ++_current) { - inserter.insert_at_end(_current->getBucketId(), *_current); - } - } -}; - -} - -void -BucketDBUpdater::removeSuperfluousBuckets( - const lib::ClusterStateBundle& newState, +BucketDBUpdater::remove_superfluous_buckets( + StripeAccessGuard& guard, + const lib::ClusterStateBundle& new_state, bool is_distribution_config_change) { - const bool move_to_read_only_db = shouldDeferStateEnabling(); const char* up_states = _op_ctx.storage_node_up_states(); + // TODO STRIPE explicit space -> config mapping, don't get via repo + // ... but we need to get the current cluster state per space..! for (auto& elem : _op_ctx.bucket_space_repo()) { - const auto& newDistribution(elem.second->getDistribution()); - const auto& oldClusterState(elem.second->getClusterState()); - const auto& new_cluster_state = newState.getDerivedClusterState(elem.first); + const auto& old_cluster_state(elem.second->getClusterState()); + const auto& new_cluster_state = new_state.getDerivedClusterState(elem.first); // Running a full DB sweep is expensive, so if the cluster state transition does // not actually indicate that buckets should possibly be removed, we elide it entirely. if (!is_distribution_config_change - && db_pruning_may_be_elided(oldClusterState, *new_cluster_state, up_states)) + && db_pruning_may_be_elided(old_cluster_state, *new_cluster_state, up_states)) { LOG(debug, "[bucket space '%s']: eliding DB pruning for state transition '%s' -> '%s'", document::FixedBucketSpaces::to_string(elem.first).data(), - oldClusterState.toString().c_str(), new_cluster_state->toString().c_str()); + old_cluster_state.toString().c_str(), new_cluster_state->toString().c_str()); continue; } - - auto& bucketDb(elem.second->getBucketDatabase()); - auto& readOnlyDb(_op_ctx.read_only_bucket_space_repo().get(elem.first).getBucketDatabase()); - - // Remove all buckets not belonging to this distributor, or - // being on storage nodes that are no longer up. - MergingNodeRemover proc( - oldClusterState, - *new_cluster_state, - _node_ctx.node_index(), - newDistribution, - up_states, - move_to_read_only_db); - - bucketDb.merge(proc); - if (move_to_read_only_db) { - ReadOnlyDbMergingInserter read_only_merger(proc.getNonOwnedEntries()); - readOnlyDb.merge(read_only_merger); + // TODO STRIPE should we also pass old state and distr config? Must ensure we're in sync with stripe... + // .. but config is set synchronously via the guard upon pending state creation edge + auto maybe_lost = guard.remove_superfluous_buckets(elem.first, *new_cluster_state, is_distribution_config_change); + if (maybe_lost.buckets != 0) { + LOGBM(info, "After cluster state change %s, %zu buckets no longer " + "have available replicas. %zu documents in these buckets will " + "be unavailable until nodes come back up", + old_cluster_state.getTextualDifference(*new_cluster_state).c_str(), + maybe_lost.buckets, maybe_lost.documents); } maybe_inject_simulated_db_pruning_delay(); } @@ -271,63 +198,58 @@ BucketDBUpdater::maybe_inject_simulated_db_merging_delay() { } void -BucketDBUpdater::ensureTransitionTimerStarted() +BucketDBUpdater::ensure_transition_timer_started() { // Don't overwrite start time if we're already processing a state, as // that will make transition times appear artificially low. - if (!hasPendingClusterState()) { - _transitionTimer = framework::MilliSecTimer( - _node_ctx.clock()); + if (!has_pending_cluster_state()) { + _transition_timer = framework::MilliSecTimer(_node_ctx.clock()); } } void -BucketDBUpdater::completeTransitionTimer() +BucketDBUpdater::complete_transition_timer() { _distributor_interface.getMetrics() - .stateTransitionTime.addValue(_transitionTimer.getElapsedTimeAsDouble()); + .stateTransitionTime.addValue(_transition_timer.getElapsedTimeAsDouble()); } void -BucketDBUpdater::clearReadOnlyBucketRepoDatabases() +BucketDBUpdater::storage_distribution_changed(const BucketSpaceDistributionConfigs& configs) { - for (auto& space : _op_ctx.read_only_bucket_space_repo()) { - space.second->getBucketDatabase().clear(); - } -} + ensure_transition_timer_started(); -void -BucketDBUpdater::storageDistributionChanged() -{ - ensureTransitionTimerStarted(); - - removeSuperfluousBuckets(_op_ctx.cluster_state_bundle(), true); + auto guard = _stripe_accessor.rendezvous_and_hold_all(); + // FIXME STRIPE might this cause a mismatch with the component stuff's own distribution config..?! + guard->update_distribution_config(configs); + remove_superfluous_buckets(*guard, _op_ctx.cluster_state_bundle(), true); auto clusterInfo = std::make_shared<const SimpleClusterInformation>( _node_ctx.node_index(), _op_ctx.cluster_state_bundle(), _op_ctx.storage_node_up_states()); - _pendingClusterState = PendingClusterState::createForDistributionChange( + _pending_cluster_state = PendingClusterState::createForDistributionChange( _node_ctx.clock(), std::move(clusterInfo), _sender, - _op_ctx.bucket_space_repo(), - _op_ctx.generate_unique_timestamp()); - _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); - _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle()); + _op_ctx.bucket_space_repo(), // TODO STRIPE cannot use! + _op_ctx.generate_unique_timestamp()); // TODO STRIPE must ensure no stripes can generate < this + _outdated_nodes_map = _pending_cluster_state->getOutdatedNodesMap(); + + guard->set_pending_cluster_state_bundle(_pending_cluster_state->getNewClusterStateBundle()); } void -BucketDBUpdater::replyToPreviousPendingClusterStateIfAny() +BucketDBUpdater::reply_to_previous_pending_cluster_state_if_any() { - if (_pendingClusterState.get() && _pendingClusterState->hasCommand()) { + if (_pending_cluster_state.get() && _pending_cluster_state->hasCommand()) { _distributor_interface.getMessageSender().sendUp( - std::make_shared<api::SetSystemStateReply>(*_pendingClusterState->getCommand())); + std::make_shared<api::SetSystemStateReply>(*_pending_cluster_state->getCommand())); } } void -BucketDBUpdater::replyToActivationWithActualVersion( +BucketDBUpdater::reply_to_activation_with_actual_version( const api::ActivateClusterStateVersionCommand& cmd, uint32_t actualVersion) { @@ -336,104 +258,49 @@ BucketDBUpdater::replyToActivationWithActualVersion( _distributor_interface.getMessageSender().sendUp(reply); // TODO let API accept rvalues } -void BucketDBUpdater::update_read_snapshot_before_db_pruning() { - std::lock_guard lock(_distribution_context_mutex); - for (auto& elem : _op_ctx.bucket_space_repo()) { - // At this point, we're still operating with a distribution context _without_ a - // pending state, i.e. anyone using the context will expect to find buckets - // in the DB that correspond to how the database looked like prior to pruning - // buckets from the DB. To ensure this is not violated, take a snapshot of the - // _mutable_ DB and expose this. This snapshot only lives until we atomically - // flip to expose a distribution context that includes the new, pending state. - // At that point, the read-only DB is known to contain the buckets that have - // been pruned away, so we can release the mutable DB snapshot safely. - // TODO test for, and handle, state preemption case! - _explicit_transition_read_guard[elem.first] = elem.second->getBucketDatabase().acquire_read_guard(); - } -} - - -void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) { - std::lock_guard lock(_distribution_context_mutex); - const auto old_default_state = _op_ctx.bucket_space_repo().get( - document::FixedBucketSpaces::default_space()).cluster_state_sp(); - for (auto& elem : _op_ctx.bucket_space_repo()) { - auto new_distribution = elem.second->distribution_sp(); - auto old_cluster_state = elem.second->cluster_state_sp(); - auto new_cluster_state = new_state.getDerivedClusterState(elem.first); - _active_distribution_contexts.insert_or_assign( - elem.first, - BucketSpaceDistributionContext::make_state_transition( - std::move(old_cluster_state), - old_default_state, - std::move(new_cluster_state), - std::move(new_distribution), - _node_ctx.node_index())); - // We can now remove the explicit mutable DB snapshot, as the buckets that have been - // pruned away are visible in the read-only DB. - _explicit_transition_read_guard[elem.first] = std::shared_ptr<BucketDatabase::ReadGuard>(); - } -} - -void BucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) { - std::lock_guard lock(_distribution_context_mutex); - const auto& default_cluster_state = activated_state.getDerivedClusterState(document::FixedBucketSpaces::default_space()); - for (auto& elem : _op_ctx.bucket_space_repo()) { - auto new_distribution = elem.second->distribution_sp(); - auto new_cluster_state = activated_state.getDerivedClusterState(elem.first); - _active_distribution_contexts.insert_or_assign( - elem.first, - BucketSpaceDistributionContext::make_stable_state( - std::move(new_cluster_state), - default_cluster_state, - std::move(new_distribution), - _node_ctx.node_index())); - } -} - bool BucketDBUpdater::onSetSystemState( const std::shared_ptr<api::SetSystemStateCommand>& cmd) { - LOG(debug, - "Received new cluster state %s", + LOG(debug, "Received new cluster state %s", cmd->getSystemState().toString().c_str()); - const lib::ClusterStateBundle oldState = _op_ctx.cluster_state_bundle(); const lib::ClusterStateBundle& state = cmd->getClusterStateBundle(); - if (state == oldState) { + if (state == _active_state_bundle) { return false; } - ensureTransitionTimerStarted(); - // Separate timer since _transitionTimer might span multiple pending states. + ensure_transition_timer_started(); + // Separate timer since _transition_timer might span multiple pending states. framework::MilliSecTimer process_timer(_node_ctx.clock()); - update_read_snapshot_before_db_pruning(); + + auto guard = _stripe_accessor.rendezvous_and_hold_all(); + guard->update_read_snapshot_before_db_pruning(); const auto& bundle = cmd->getClusterStateBundle(); - removeSuperfluousBuckets(bundle, false); - update_read_snapshot_after_db_pruning(bundle); - replyToPreviousPendingClusterStateIfAny(); + remove_superfluous_buckets(*guard, bundle, false); + guard->update_read_snapshot_after_db_pruning(bundle); + reply_to_previous_pending_cluster_state_if_any(); auto clusterInfo = std::make_shared<const SimpleClusterInformation>( _node_ctx.node_index(), _op_ctx.cluster_state_bundle(), _op_ctx.storage_node_up_states()); - _pendingClusterState = PendingClusterState::createForClusterStateChange( + _pending_cluster_state = PendingClusterState::createForClusterStateChange( _node_ctx.clock(), std::move(clusterInfo), _sender, - _op_ctx.bucket_space_repo(), + _op_ctx.bucket_space_repo(), // TODO STRIPE remove cmd, - _outdatedNodesMap, - _op_ctx.generate_unique_timestamp()); - _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); + _outdated_nodes_map, + _op_ctx.generate_unique_timestamp()); // FIXME STRIPE must be atomic across all threads + _outdated_nodes_map = _pending_cluster_state->getOutdatedNodesMap(); _distributor_interface.getMetrics().set_cluster_state_processing_time.addValue( process_timer.getElapsedTimeAsDouble()); - _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle()); - if (isPendingClusterStateCompleted()) { - processCompletedPendingClusterState(); + guard->set_pending_cluster_state_bundle(_pending_cluster_state->getNewClusterStateBundle()); + if (is_pending_cluster_state_completed()) { + process_completed_pending_cluster_state(*guard); } return true; } @@ -441,25 +308,26 @@ BucketDBUpdater::onSetSystemState( bool BucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) { - if (hasPendingClusterState() && _pendingClusterState->isVersionedTransition()) { - const auto pending_version = _pendingClusterState->clusterStateVersion(); + if (has_pending_cluster_state() && _pending_cluster_state->isVersionedTransition()) { + const auto pending_version = _pending_cluster_state->clusterStateVersion(); if (pending_version == cmd->version()) { - if (isPendingClusterStateCompleted()) { - assert(_pendingClusterState->isDeferred()); - activatePendingClusterState(); + if (is_pending_cluster_state_completed()) { + assert(_pending_cluster_state->isDeferred()); + auto guard = _stripe_accessor.rendezvous_and_hold_all(); + activate_pending_cluster_state(*guard); } else { LOG(error, "Received cluster state activation for pending version %u " "without pending state being complete yet. This is not expected, " "as no activation should be sent before all distributors have " "reported that state processing is complete.", pending_version); - replyToActivationWithActualVersion(*cmd, 0); // Invalid version, will cause re-send (hopefully when completed). + reply_to_activation_with_actual_version(*cmd, 0); // Invalid version, will cause re-send (hopefully when completed). return true; } } else { - replyToActivationWithActualVersion(*cmd, pending_version); + reply_to_activation_with_actual_version(*cmd, pending_version); return true; } - } else if (shouldDeferStateEnabling()) { + } else if (should_defer_state_enabling()) { // Likely just a resend, but log warn for now to get a feel of how common it is. LOG(warning, "Received cluster state activation command for version %u, which " "has no corresponding pending state. Likely resent operation.", cmd->version()); @@ -471,6 +339,7 @@ BucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::Activa return false; } +// TODO remove entirely from this abstraction level? BucketDBUpdater::MergeReplyGuard::~MergeReplyGuard() { if (_reply) { @@ -488,71 +357,36 @@ BucketDBUpdater::onMergeBucketReply( // actually merged (source-only nodes?) we request the bucket info of the // bucket again to make sure it's ok. for (uint32_t i = 0; i < reply->getNodes().size(); i++) { - sendRequestBucketInfo(reply->getNodes()[i].index, - reply->getBucket(), - replyGuard); + send_request_bucket_info(reply->getNodes()[i].index, + reply->getBucket(), + replyGuard); } return true; } void -BucketDBUpdater::enqueueRecheckUntilPendingStateEnabled( +BucketDBUpdater::enqueue_recheck_until_pending_state_enabled( uint16_t node, const document::Bucket& bucket) { - LOG(spam, - "DB updater has a pending cluster state, enqueuing recheck " - "of bucket %s on node %u until state is done processing", - bucket.toString().c_str(), - node); - _enqueuedRechecks.insert(EnqueuedBucketRecheck(node, bucket)); + LOG(spam, "DB updater has a pending cluster state, enqueuing recheck " + "of bucket %s on node %u until state is done processing", + bucket.toString().c_str(), node); + _enqueued_rechecks.insert(EnqueuedBucketRecheck(node, bucket)); } void -BucketDBUpdater::sendAllQueuedBucketRechecks() +BucketDBUpdater::send_all_queued_bucket_rechecks() { - LOG(spam, - "Sending %zu queued bucket rechecks previously received " - "via NotifyBucketChange commands", - _enqueuedRechecks.size()); + LOG(spam, "Sending %zu queued bucket rechecks previously received " + "via NotifyBucketChange commands", + _enqueued_rechecks.size()); - for (const auto & entry :_enqueuedRechecks) { - sendRequestBucketInfo(entry.node, entry.bucket, std::shared_ptr<MergeReplyGuard>()); + for (const auto & entry :_enqueued_rechecks) { + send_request_bucket_info(entry.node, entry.bucket, std::shared_ptr<MergeReplyGuard>()); } - _enqueuedRechecks.clear(); -} - -bool -BucketDBUpdater::onNotifyBucketChange( - const std::shared_ptr<api::NotifyBucketChangeCommand>& cmd) -{ - // Immediately schedule reply to ensure it is sent. - _sender.sendReply(std::make_shared<api::NotifyBucketChangeReply>(*cmd)); - - if (!cmd->getBucketInfo().valid()) { - LOG(error, - "Received invalid bucket info for bucket %s from notify bucket " - "change! Not updating bucket.", - cmd->getBucketId().toString().c_str()); - return true; - } - LOG(debug, - "Received notify bucket change from node %u for bucket %s with %s.", - cmd->getSourceIndex(), - cmd->getBucketId().toString().c_str(), - cmd->getBucketInfo().toString().c_str()); - - if (hasPendingClusterState()) { - enqueueRecheckUntilPendingStateEnabled(cmd->getSourceIndex(), - cmd->getBucket()); - } else { - sendRequestBucketInfo(cmd->getSourceIndex(), - cmd->getBucket(), - std::shared_ptr<MergeReplyGuard>()); - } - - return true; + _enqueued_rechecks.clear(); } bool sort_pred(const BucketListMerger::BucketEntry& left, @@ -563,34 +397,34 @@ bool sort_pred(const BucketListMerger::BucketEntry& left, bool BucketDBUpdater::onRequestBucketInfoReply( - const std::shared_ptr<api::RequestBucketInfoReply> & repl) + const std::shared_ptr<api::RequestBucketInfoReply>& repl) { - if (pendingClusterStateAccepted(repl)) { + if (pending_cluster_state_accepted(repl)) { return true; } - return processSingleBucketInfoReply(repl); + return process_single_bucket_info_reply(repl); } bool -BucketDBUpdater::pendingClusterStateAccepted( - const std::shared_ptr<api::RequestBucketInfoReply> & repl) +BucketDBUpdater::pending_cluster_state_accepted( + const std::shared_ptr<api::RequestBucketInfoReply>& repl) { - if (_pendingClusterState.get() - && _pendingClusterState->onRequestBucketInfoReply(repl)) + if (_pending_cluster_state.get() + && _pending_cluster_state->onRequestBucketInfoReply(repl)) { - if (isPendingClusterStateCompleted()) { - processCompletedPendingClusterState(); + if (is_pending_cluster_state_completed()) { + auto guard = _stripe_accessor.rendezvous_and_hold_all(); + process_completed_pending_cluster_state(*guard); } return true; } - LOG(spam, - "Reply %s was not accepted by pending cluster state", + LOG(spam, "Reply %s was not accepted by pending cluster state", repl->toString().c_str()); return false; } void -BucketDBUpdater::handleSingleBucketInfoFailure( +BucketDBUpdater::handle_single_bucket_info_failure( const std::shared_ptr<api::RequestBucketInfoReply>& repl, const BucketRequest& req) { @@ -600,31 +434,31 @@ BucketDBUpdater::handleSingleBucketInfoFailure( if (req.bucket.getBucketId() != document::BucketId(0)) { framework::MilliSecTime sendTime(_node_ctx.clock()); sendTime += framework::MilliSecTime(100); - _delayedRequests.emplace_back(sendTime, req); + _delayed_requests.emplace_back(sendTime, req); } } void -BucketDBUpdater::resendDelayedMessages() +BucketDBUpdater::resend_delayed_messages() { - if (_pendingClusterState) { - _pendingClusterState->resendDelayedMessages(); + if (_pending_cluster_state) { + _pending_cluster_state->resendDelayedMessages(); } - if (_delayedRequests.empty()) { + if (_delayed_requests.empty()) { return; // Don't fetch time if not needed } framework::MilliSecTime currentTime(_node_ctx.clock()); - while (!_delayedRequests.empty() - && currentTime >= _delayedRequests.front().first) + while (!_delayed_requests.empty() + && currentTime >= _delayed_requests.front().first) { - BucketRequest& req(_delayedRequests.front().second); - sendRequestBucketInfo(req.targetNode, req.bucket, std::shared_ptr<MergeReplyGuard>()); - _delayedRequests.pop_front(); + BucketRequest& req(_delayed_requests.front().second); + send_request_bucket_info(req.targetNode, req.bucket, std::shared_ptr<MergeReplyGuard>()); + _delayed_requests.pop_front(); } } void -BucketDBUpdater::convertBucketInfoToBucketList( +BucketDBUpdater::convert_bucket_info_to_bucket_list( const std::shared_ptr<api::RequestBucketInfoReply>& repl, uint16_t targetNode, BucketListMerger::BucketList& newList) { @@ -637,51 +471,51 @@ BucketDBUpdater::convertBucketInfoToBucketList( } void -BucketDBUpdater::mergeBucketInfoWithDatabase( +BucketDBUpdater::merge_bucket_info_with_database( const std::shared_ptr<api::RequestBucketInfoReply>& repl, const BucketRequest& req) { BucketListMerger::BucketList existing; BucketListMerger::BucketList newList; - findRelatedBucketsInDatabase(req.targetNode, req.bucket, existing); - convertBucketInfoToBucketList(repl, req.targetNode, newList); + find_related_buckets_in_database(req.targetNode, req.bucket, existing); + convert_bucket_info_to_bucket_list(repl, req.targetNode, newList); std::sort(existing.begin(), existing.end(), sort_pred); std::sort(newList.begin(), newList.end(), sort_pred); BucketListMerger merger(newList, existing, req.timestamp); - updateDatabase(req.bucket.getBucketSpace(), req.targetNode, merger); + update_database(req.bucket.getBucketSpace(), req.targetNode, merger); } bool -BucketDBUpdater::processSingleBucketInfoReply( +BucketDBUpdater::process_single_bucket_info_reply( const std::shared_ptr<api::RequestBucketInfoReply> & repl) { - auto iter = _sentMessages.find(repl->getMsgId()); + auto iter = _sent_messages.find(repl->getMsgId()); // Has probably been deleted for some reason earlier. - if (iter == _sentMessages.end()) { + if (iter == _sent_messages.end()) { return true; } BucketRequest req = iter->second; - _sentMessages.erase(iter); + _sent_messages.erase(iter); if (!_op_ctx.storage_node_is_up(req.bucket.getBucketSpace(), req.targetNode)) { // Ignore replies from nodes that are down. return true; } if (repl->getResult().getResult() != api::ReturnCode::OK) { - handleSingleBucketInfoFailure(repl, req); + handle_single_bucket_info_failure(repl, req); return true; } - mergeBucketInfoWithDatabase(repl, req); + merge_bucket_info_with_database(repl, req); return true; } void -BucketDBUpdater::addBucketInfoForNode( +BucketDBUpdater::add_bucket_info_for_node( const BucketDatabase::Entry& e, uint16_t node, BucketListMerger::BucketList& existing) const @@ -693,20 +527,20 @@ BucketDBUpdater::addBucketInfoForNode( } void -BucketDBUpdater::findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket, - BucketListMerger::BucketList& existing) +BucketDBUpdater::find_related_buckets_in_database(uint16_t node, const document::Bucket& bucket, + BucketListMerger::BucketList& existing) { auto &distributorBucketSpace(_op_ctx.bucket_space_repo().get(bucket.getBucketSpace())); std::vector<BucketDatabase::Entry> entries; distributorBucketSpace.getBucketDatabase().getAll(bucket.getBucketId(), entries); for (const BucketDatabase::Entry & entry : entries) { - addBucketInfoForNode(entry, node, existing); + add_bucket_info_for_node(entry, node, existing); } } void -BucketDBUpdater::updateDatabase(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger) +BucketDBUpdater::update_database(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger) { for (const document::BucketId & bucketId : merger.getRemovedEntries()) { document::Bucket bucket(bucketSpace, bucketId); @@ -723,18 +557,18 @@ BucketDBUpdater::updateDatabase(document::BucketSpace bucketSpace, uint16_t node } bool -BucketDBUpdater::isPendingClusterStateCompleted() const +BucketDBUpdater::is_pending_cluster_state_completed() const { - return _pendingClusterState.get() && _pendingClusterState->done(); + return _pending_cluster_state.get() && _pending_cluster_state->done(); } void -BucketDBUpdater::processCompletedPendingClusterState() +BucketDBUpdater::process_completed_pending_cluster_state(StripeAccessGuard& guard) { - if (_pendingClusterState->isDeferred()) { + if (_pending_cluster_state->isDeferred()) { LOG(debug, "Deferring completion of pending cluster state version %u until explicitly activated", - _pendingClusterState->clusterStateVersion()); - assert(_pendingClusterState->hasCommand()); // Deferred transitions should only ever be created by state commands. + _pending_cluster_state->clusterStateVersion()); + assert(_pending_cluster_state->hasCommand()); // Deferred transitions should only ever be created by state commands. // Sending down SetSystemState command will reach the state manager and a reply // will be auto-sent back to the cluster controller in charge. Once this happens, // it will send an explicit activation command once all distributors have reported @@ -743,73 +577,81 @@ BucketDBUpdater::processCompletedPendingClusterState() // taken effect via activation. External operation handler will keep operations from // actually being scheduled until state has been activated. The external operation handler // needs to be explicitly aware of the case where no state has yet to be activated. - _distributor_interface.getMessageSender().sendDown( - _pendingClusterState->getCommand()); - _pendingClusterState->clearCommand(); + _distributor_interface.getMessageSender().sendDown(_pending_cluster_state->getCommand()); + _pending_cluster_state->clearCommand(); return; } // Distribution config change or non-deferred cluster state. Immediately activate // the pending state without being told to do so explicitly. - activatePendingClusterState(); + activate_pending_cluster_state(guard); } void -BucketDBUpdater::activatePendingClusterState() +BucketDBUpdater::activate_pending_cluster_state(StripeAccessGuard& guard) { framework::MilliSecTimer process_timer(_node_ctx.clock()); - _pendingClusterState->mergeIntoBucketDatabases(); + _pending_cluster_state->merge_into_bucket_databases(guard); maybe_inject_simulated_db_merging_delay(); - if (_pendingClusterState->isVersionedTransition()) { - LOG(debug, "Activating pending cluster state version %u", _pendingClusterState->clusterStateVersion()); - enableCurrentClusterStateBundleInDistributor(); - if (_pendingClusterState->hasCommand()) { - _distributor_interface.getMessageSender().sendDown( - _pendingClusterState->getCommand()); + if (_pending_cluster_state->isVersionedTransition()) { + LOG(debug, "Activating pending cluster state version %u", _pending_cluster_state->clusterStateVersion()); + enable_current_cluster_state_bundle_in_distributor_and_stripes(guard); + if (_pending_cluster_state->hasCommand()) { + _distributor_interface.getMessageSender().sendDown(_pending_cluster_state->getCommand()); } - addCurrentStateToClusterStateHistory(); + add_current_state_to_cluster_state_history(); } else { LOG(debug, "Activating pending distribution config"); // TODO distribution changes cannot currently be deferred as they are not // initiated by the cluster controller! - _distributor_interface.notifyDistributionChangeEnabled(); + _distributor_interface.notifyDistributionChangeEnabled(); // TODO factor these two out into one func? + guard.notify_distribution_change_enabled(); } - update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle()); - _pendingClusterState.reset(); - _outdatedNodesMap.clear(); - _op_ctx.bucket_space_repo().clear_pending_cluster_state_bundle(); - sendAllQueuedBucketRechecks(); - completeTransitionTimer(); - clearReadOnlyBucketRepoDatabases(); + guard.update_read_snapshot_after_activation(_pending_cluster_state->getNewClusterStateBundle()); + _pending_cluster_state.reset(); + _outdated_nodes_map.clear(); + guard.clear_pending_cluster_state_bundle(); + send_all_queued_bucket_rechecks(); + complete_transition_timer(); + guard.clear_read_only_bucket_repo_databases(); _distributor_interface.getMetrics().activate_cluster_state_processing_time.addValue( process_timer.getElapsedTimeAsDouble()); } void -BucketDBUpdater::enableCurrentClusterStateBundleInDistributor() +BucketDBUpdater::enable_current_cluster_state_bundle_in_distributor_and_stripes(StripeAccessGuard& guard) { - const lib::ClusterStateBundle& state( - _pendingClusterState->getNewClusterStateBundle()); + const lib::ClusterStateBundle& state = _pending_cluster_state->getNewClusterStateBundle(); - LOG(debug, - "BucketDBUpdater finished processing state %s", + _active_state_bundle = _pending_cluster_state->getNewClusterStateBundle(); + propagate_active_state_bundle_internally(); + + LOG(debug, "BucketDBUpdater finished processing state %s", state.getBaselineClusterState()->toString().c_str()); + // First enable the cluster state for the _top-level_ distributor component. _distributor_interface.enableClusterStateBundle(state); + // And then subsequently for all underlying stripes. Technically the order doesn't matter + // since all threads are blocked at this point. + guard.enable_cluster_state_bundle(state); } void BucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state) { - update_read_snapshot_after_activation(activated_state); + auto guard = _stripe_accessor.rendezvous_and_hold_all(); _distributor_interface.enableClusterStateBundle(activated_state); + guard->enable_cluster_state_bundle(activated_state); + + _active_state_bundle = activated_state; + propagate_active_state_bundle_internally(); } void -BucketDBUpdater::addCurrentStateToClusterStateHistory() +BucketDBUpdater::add_current_state_to_cluster_state_history() { - _history.push_back(_pendingClusterState->getSummary()); + _history.push_back(_pending_cluster_state->getSummary()); if (_history.size() > 50) { _history.pop_front(); @@ -857,22 +699,22 @@ BucketDBUpdater::reportStatus(std::ostream& out, xos << XmlTag("status") << XmlAttribute("id", BUCKETDB) << XmlAttribute("name", BUCKETDB_UPDATER); - reportXmlStatus(xos, path); + report_xml_status(xos, path); xos << XmlEndTag(); return true; } vespalib::string -BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos, - const framework::HttpUrlPath&) const +BucketDBUpdater::report_xml_status(vespalib::xml::XmlOutputStream& xos, + const framework::HttpUrlPath&) const { using namespace vespalib::xml; xos << XmlTag("bucketdb") << XmlTag("systemstate_active") << XmlContent(_op_ctx.cluster_state_bundle().getBaselineClusterState()->toString()) << XmlEndTag(); - if (_pendingClusterState) { - xos << *_pendingClusterState; + if (_pending_cluster_state) { + xos << *_pending_cluster_state; } xos << XmlTag("systemstate_history"); for (auto i(_history.rbegin()), e(_history.rend()); i != e; ++i) { @@ -884,13 +726,13 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos, } xos << XmlEndTag() << XmlTag("single_bucket_requests"); - for (const auto & entry : _sentMessages) + for (const auto & entry : _sent_messages) { entry.second.print_xml_tag(xos, XmlAttribute("sendtimestamp", entry.second.timestamp)); } xos << XmlEndTag() << XmlTag("delayed_single_bucket_requests"); - for (const auto & entry : _delayedRequests) + for (const auto & entry : _delayed_requests) { entry.second.print_xml_tag(xos, XmlAttribute("resendtimestamp", entry.first.getTime())); } @@ -898,166 +740,4 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos, return ""; } -BucketDBUpdater::MergingNodeRemover::MergingNodeRemover( - const lib::ClusterState& oldState, - const lib::ClusterState& s, - uint16_t localIndex, - const lib::Distribution& distribution, - const char* upStates, - bool track_non_owned_entries) - : _oldState(oldState), - _state(s), - _available_nodes(), - _nonOwnedBuckets(), - _removed_buckets(0), - _removed_documents(0), - _localIndex(localIndex), - _distribution(distribution), - _upStates(upStates), - _track_non_owned_entries(track_non_owned_entries), - _cachedDecisionSuperbucket(UINT64_MAX), - _cachedOwned(false) -{ - // TODO intersection of cluster state and distribution config - const uint16_t storage_count = s.getNodeCount(lib::NodeType::STORAGE); - _available_nodes.resize(storage_count); - for (uint16_t i = 0; i < storage_count; ++i) { - if (s.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState().oneOf(_upStates)) { - _available_nodes[i] = true; - } - } -} - -void -BucketDBUpdater::MergingNodeRemover::logRemove(const document::BucketId& bucketId, const char* msg) const -{ - LOG(spam, "Removing bucket %s: %s", bucketId.toString().c_str(), msg); -} - -namespace { - -uint64_t superbucket_from_id(const document::BucketId& id, uint16_t distribution_bits) noexcept { - // The n LSBs of the bucket ID contain the superbucket number. Mask off the rest. - return id.getRawId() & ~(UINT64_MAX << distribution_bits); -} - -} - -bool -BucketDBUpdater::MergingNodeRemover::distributorOwnsBucket( - const document::BucketId& bucketId) const -{ - // TODO "no distributors available" case is the same for _all_ buckets; cache once in constructor. - // TODO "too few bits used" case can be cheaply checked without needing exception - try { - const auto bits = _state.getDistributionBitCount(); - const auto this_superbucket = superbucket_from_id(bucketId, bits); - if (_cachedDecisionSuperbucket == this_superbucket) { - if (!_cachedOwned) { - logRemove(bucketId, "bucket now owned by another distributor (cached)"); - } - return _cachedOwned; - } - - uint16_t distributor = _distribution.getIdealDistributorNode(_state, bucketId, "uim"); - _cachedDecisionSuperbucket = this_superbucket; - _cachedOwned = (distributor == _localIndex); - if (!_cachedOwned) { - logRemove(bucketId, "bucket now owned by another distributor"); - return false; - } - return true; - } catch (lib::TooFewBucketBitsInUseException& exc) { - logRemove(bucketId, "using too few distribution bits now"); - } catch (lib::NoDistributorsAvailableException& exc) { - logRemove(bucketId, "no distributors are available"); - } - return false; -} - -void -BucketDBUpdater::MergingNodeRemover::setCopiesInEntry( - BucketDatabase::Entry& e, - const std::vector<BucketCopy>& copies) const -{ - e->clear(); - - std::vector<uint16_t> order = - _distribution.getIdealStorageNodes(_state, e.getBucketId(), _upStates); - - e->addNodes(copies, order); - - LOG(spam, "Changed %s", e->toString().c_str()); -} - -bool -BucketDBUpdater::MergingNodeRemover::has_unavailable_nodes(const storage::BucketDatabase::Entry& e) const -{ - const uint16_t n_nodes = e->getNodeCount(); - for (uint16_t i = 0; i < n_nodes; i++) { - const uint16_t node_idx = e->getNodeRef(i).getNode(); - if (!storage_node_is_available(node_idx)) { - return true; - } - } - return false; -} - -BucketDatabase::MergingProcessor::Result -BucketDBUpdater::MergingNodeRemover::merge(storage::BucketDatabase::Merger& merger) -{ - document::BucketId bucketId(merger.bucket_id()); - LOG(spam, "Check for remove: bucket %s", bucketId.toString().c_str()); - if (!distributorOwnsBucket(bucketId)) { - // TODO remove in favor of DB snapshotting - if (_track_non_owned_entries) { - _nonOwnedBuckets.emplace_back(merger.current_entry()); - } - return Result::Skip; - } - auto& e = merger.current_entry(); - - if (e->getNodeCount() == 0) { // TODO when should this edge ever trigger? - return Result::Skip; - } - - if (!has_unavailable_nodes(e)) { - return Result::KeepUnchanged; - } - - std::vector<BucketCopy> remainingCopies; - for (uint16_t i = 0; i < e->getNodeCount(); i++) { - const uint16_t node_idx = e->getNodeRef(i).getNode(); - if (storage_node_is_available(node_idx)) { - remainingCopies.push_back(e->getNodeRef(i)); - } - } - - if (remainingCopies.empty()) { - ++_removed_buckets; - _removed_documents += e->getHighestDocumentCount(); - return Result::Skip; - } else { - setCopiesInEntry(e, remainingCopies); - return Result::Update; - } -} - -bool -BucketDBUpdater::MergingNodeRemover::storage_node_is_available(uint16_t index) const noexcept -{ - return ((index < _available_nodes.size()) && _available_nodes[index]); -} - -BucketDBUpdater::MergingNodeRemover::~MergingNodeRemover() -{ - if (_removed_buckets != 0) { - LOGBM(info, "After cluster state change %s, %zu buckets no longer " - "have available replicas. %zu documents in these buckets will " - "be unavailable until nodes come back up", - _oldState.getTextualDifference(_state).c_str(), - _removed_buckets, _removed_documents); - } -} - } // distributor diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index 375a5cee4e7..1a4677ee507 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -26,8 +26,11 @@ class XmlAttribute; namespace storage::distributor { -class DistributorStripeInterface; +class BucketSpaceDistributionConfigs; class BucketSpaceDistributionContext; +class DistributorStripeInterface; +class StripeAccessor; +class StripeAccessGuard; class BucketDBUpdater : public framework::StatusReporter, public api::MessageHandler @@ -35,41 +38,31 @@ class BucketDBUpdater : public framework::StatusReporter, public: using OutdatedNodesMap = dbtransition::OutdatedNodesMap; BucketDBUpdater(DistributorStripeInterface& owner, - DistributorBucketSpaceRepo& bucketSpaceRepo, - DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, DistributorMessageSender& sender, - DistributorComponentRegister& compReg); + DistributorComponentRegister& comp_reg, + StripeAccessor& stripe_accessor); ~BucketDBUpdater() override; void flush(); - const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const; - void recheckBucketInfo(uint32_t nodeIdx, const document::Bucket& bucket); bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override; bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) override; bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply> & repl) override; bool onMergeBucketReply(const std::shared_ptr<api::MergeBucketReply>& reply) override; - bool onNotifyBucketChange(const std::shared_ptr<api::NotifyBucketChangeCommand>&) override; - void resendDelayedMessages(); - void storageDistributionChanged(); - vespalib::string reportXmlStatus(vespalib::xml::XmlOutputStream&, const framework::HttpUrlPath&) const; vespalib::string getReportContentType(const framework::HttpUrlPath&) const override; bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override; + + void resend_delayed_messages(); + void storage_distribution_changed(const BucketSpaceDistributionConfigs& configs); + void bootstrap_distribution_config(std::shared_ptr<const lib::Distribution>); + + vespalib::string report_xml_status(vespalib::xml::XmlOutputStream& xos, const framework::HttpUrlPath&) const; + void print(std::ostream& out, bool verbose, const std::string& indent) const; const DistributorNodeContext& node_context() const { return _node_ctx; } DistributorOperationContext& operation_context() { return _op_ctx; } - /** - * Returns whether the current PendingClusterState indicates that there has - * been a transfer of bucket ownership amongst the distributors in the - * cluster. This method only makes sense to call when _pendingClusterState - * is active, such as from within a enableClusterState() call. - */ - bool bucketOwnershipHasChanged() const { - return ((_pendingClusterState.get() != nullptr) - && _pendingClusterState->hasBucketOwnershipTransfer()); - } void set_stale_reads_enabled(bool enabled) noexcept { _stale_reads_enabled.store(enabled, std::memory_order_relaxed); } @@ -77,7 +70,6 @@ public: return _stale_reads_enabled.load(std::memory_order_relaxed); } - OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const; private: class MergeReplyGuard { public: @@ -141,126 +133,78 @@ private: // Transitively invokes Distributor::enableClusterStateBundle void simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state); - bool shouldDeferStateEnabling() const noexcept; - bool hasPendingClusterState() const; - bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl); - bool processSingleBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply>& repl); - void handleSingleBucketInfoFailure(const std::shared_ptr<api::RequestBucketInfoReply>& repl, - const BucketRequest& req); - bool isPendingClusterStateCompleted() const; - void processCompletedPendingClusterState(); - void activatePendingClusterState(); - void mergeBucketInfoWithDatabase(const std::shared_ptr<api::RequestBucketInfoReply>& repl, - const BucketRequest& req); - void convertBucketInfoToBucketList(const std::shared_ptr<api::RequestBucketInfoReply>& repl, - uint16_t targetNode, BucketListMerger::BucketList& newList); - void sendRequestBucketInfo(uint16_t node, const document::Bucket& bucket, - const std::shared_ptr<MergeReplyGuard>& mergeReply); - void addBucketInfoForNode(const BucketDatabase::Entry& e, uint16_t node, - BucketListMerger::BucketList& existing) const; - void ensureTransitionTimerStarted(); - void completeTransitionTimer(); - void clearReadOnlyBucketRepoDatabases(); + bool should_defer_state_enabling() const noexcept; + bool has_pending_cluster_state() const; + bool pending_cluster_state_accepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl); + bool process_single_bucket_info_reply(const std::shared_ptr<api::RequestBucketInfoReply>& repl); + void handle_single_bucket_info_failure(const std::shared_ptr<api::RequestBucketInfoReply>& repl, + const BucketRequest& req); + bool is_pending_cluster_state_completed() const; + void process_completed_pending_cluster_state(StripeAccessGuard& guard); + void activate_pending_cluster_state(StripeAccessGuard& guard); + void merge_bucket_info_with_database(const std::shared_ptr<api::RequestBucketInfoReply>& repl, + const BucketRequest& req); + void convert_bucket_info_to_bucket_list(const std::shared_ptr<api::RequestBucketInfoReply>& repl, + uint16_t targetNode, BucketListMerger::BucketList& newList); + void send_request_bucket_info(uint16_t node, const document::Bucket& bucket, + const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard); + void add_bucket_info_for_node(const BucketDatabase::Entry& e, uint16_t node, + BucketListMerger::BucketList& existing) const; + void ensure_transition_timer_started(); + void complete_transition_timer(); /** * Adds all buckets contained in the bucket database * that are either contained * in bucketId, or that bucketId is contained in, that have copies * on the given node. */ - void findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket, - BucketListMerger::BucketList& existing); + void find_related_buckets_in_database(uint16_t node, const document::Bucket& bucket, + BucketListMerger::BucketList& existing); /** Updates the bucket database from the information generated by the given bucket list merger. */ - void updateDatabase(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger); + void update_database(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger); - void updateState(const lib::ClusterState& oldState, const lib::ClusterState& newState); + void remove_superfluous_buckets(StripeAccessGuard& guard, + const lib::ClusterStateBundle& new_state, + bool is_distribution_config_change); - void update_read_snapshot_before_db_pruning(); - void removeSuperfluousBuckets(const lib::ClusterStateBundle& newState, - bool is_distribution_config_change); - void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state); - void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state); - - void replyToPreviousPendingClusterStateIfAny(); - void replyToActivationWithActualVersion( + void reply_to_previous_pending_cluster_state_if_any(); + void reply_to_activation_with_actual_version( const api::ActivateClusterStateVersionCommand& cmd, uint32_t actualVersion); - void enableCurrentClusterStateBundleInDistributor(); - void addCurrentStateToClusterStateHistory(); - void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::Bucket&); - void sendAllQueuedBucketRechecks(); + void enable_current_cluster_state_bundle_in_distributor_and_stripes(StripeAccessGuard& guard); + void add_current_state_to_cluster_state_history(); + void enqueue_recheck_until_pending_state_enabled(uint16_t node, const document::Bucket& bucket); + void send_all_queued_bucket_rechecks(); + + void propagate_active_state_bundle_internally(); void maybe_inject_simulated_db_pruning_delay(); void maybe_inject_simulated_db_merging_delay(); - /** - Removes all copies of buckets that are on nodes that are down. - */ - class MergingNodeRemover : public BucketDatabase::MergingProcessor { - public: - MergingNodeRemover(const lib::ClusterState& oldState, - const lib::ClusterState& s, - uint16_t localIndex, - const lib::Distribution& distribution, - const char* upStates, - bool track_non_owned_entries); - ~MergingNodeRemover() override; - - Result merge(BucketDatabase::Merger&) override; - void logRemove(const document::BucketId& bucketId, const char* msg) const; - bool distributorOwnsBucket(const document::BucketId&) const; - - const std::vector<BucketDatabase::Entry>& getNonOwnedEntries() const noexcept { - return _nonOwnedBuckets; - } - private: - void setCopiesInEntry(BucketDatabase::Entry& e, const std::vector<BucketCopy>& copies) const; - - bool has_unavailable_nodes(const BucketDatabase::Entry&) const; - bool storage_node_is_available(uint16_t index) const noexcept; - - const lib::ClusterState _oldState; - const lib::ClusterState _state; - std::vector<bool> _available_nodes; - std::vector<BucketDatabase::Entry> _nonOwnedBuckets; - size_t _removed_buckets; - size_t _removed_documents; - - uint16_t _localIndex; - const lib::Distribution& _distribution; - const char* _upStates; - bool _track_non_owned_entries; - - mutable uint64_t _cachedDecisionSuperbucket; - mutable bool _cachedOwned; - }; + // TODO STRIPE remove once distributor component dependencies have been pruned + StripeAccessor& _stripe_accessor; + lib::ClusterStateBundle _active_state_bundle; + std::unique_ptr<DistributorBucketSpaceRepo> _dummy_mutable_bucket_space_repo; + std::unique_ptr<DistributorBucketSpaceRepo> _dummy_read_only_bucket_space_repo; - DistributorStripeComponent _distributorComponent; + DistributorStripeComponent _distributor_component; const DistributorNodeContext& _node_ctx; DistributorOperationContext& _op_ctx; DistributorStripeInterface& _distributor_interface; - std::deque<std::pair<framework::MilliSecTime, BucketRequest> > _delayedRequests; - std::map<uint64_t, BucketRequest> _sentMessages; - std::unique_ptr<PendingClusterState> _pendingClusterState; + std::deque<std::pair<framework::MilliSecTime, BucketRequest>> _delayed_requests; + std::map<uint64_t, BucketRequest> _sent_messages; + std::unique_ptr<PendingClusterState> _pending_cluster_state; std::list<PendingClusterState::Summary> _history; DistributorMessageSender& _sender; - std::set<EnqueuedBucketRecheck> _enqueuedRechecks; - OutdatedNodesMap _outdatedNodesMap; - framework::MilliSecTimer _transitionTimer; + std::set<EnqueuedBucketRecheck> _enqueued_rechecks; + OutdatedNodesMap _outdated_nodes_map; + framework::MilliSecTimer _transition_timer; std::atomic<bool> _stale_reads_enabled; - using DistributionContexts = std::unordered_map<document::BucketSpace, - std::shared_ptr<BucketSpaceDistributionContext>, - document::BucketSpace::hash>; - DistributionContexts _active_distribution_contexts; - using DbGuards = std::unordered_map<document::BucketSpace, - std::shared_ptr<BucketDatabase::ReadGuard>, - document::BucketSpace::hash>; - DbGuards _explicit_transition_read_guard; - mutable std::mutex _distribution_context_mutex; }; } diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index f886640d1f9..0f63de5cd85 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -1,12 +1,15 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. // #include "blockingoperationstarter.h" +#include "bucket_space_distribution_configs.h" +#include "bucketdbupdater.h" #include "distributor.h" #include "distributor_bucket_space.h" #include "distributor_status.h" #include "distributor_stripe.h" #include "distributormetricsset.h" #include "idealstatemetricsset.h" +#include "legacy_single_stripe_accessor.h" #include "operation_sequencer.h" #include "ownership_transfer_safe_time_point_calculator.h" #include "throttlingoperationstarter.h" @@ -47,16 +50,21 @@ Distributor::Distributor(DistributorComponentRegister& compReg, ChainedMessageSender* messageSender) : StorageLink("distributor"), framework::StatusReporter("distributor", "Distributor"), + _comp_reg(compReg), _metrics(std::make_shared<DistributorMetricSet>()), _messageSender(messageSender), _stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, doneInitHandler, manageActiveBucketCopies, *this)), + _stripe_accessor(std::make_unique<LegacySingleStripeAccessor>(*_stripe)), _component(compReg, "distributor"), + _bucket_db_updater(), _distributorStatusDelegate(compReg, *this, *this), _threadPool(threadPool), _tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN), _metricUpdateHook(*this), - _hostInfoReporter(*this, *this) + _hostInfoReporter(*this, *this), + _distribution(), + _next_distribution() { _component.registerMetric(*_metrics); _component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0)); @@ -113,12 +121,12 @@ Distributor::distributor_component() noexcept { return _stripe->_component; } -BucketDBUpdater& +StripeBucketDBUpdater& Distributor::bucket_db_updater() { return _stripe->bucket_db_updater(); } -const BucketDBUpdater& +const StripeBucketDBUpdater& Distributor::bucket_db_updater() const { return _stripe->bucket_db_updater(); } @@ -188,6 +196,9 @@ Distributor::onOpen() void Distributor::onClose() { LOG(debug, "Distributor::onClose invoked"); _stripe->close(); + if (_bucket_db_updater) { + _bucket_db_updater->flush(); + } } void @@ -210,18 +221,47 @@ Distributor::sendDown(const std::shared_ptr<api::StorageMessage>& msg) } } +namespace { + +bool should_be_handled_by_top_level_bucket_db_updater(const api::StorageMessage& msg) noexcept { + switch (msg.getType().getId()) { + case api::MessageType::SETSYSTEMSTATE_ID: + case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID: + return true; + case api::MessageType::REQUESTBUCKETINFO_REPLY_ID: + // Top-level component should only handle replies for full bucket info fetches. + // Bucket-specific requests should go to the stripes that sent them. + return dynamic_cast<const api::RequestBucketInfoReply&>(msg).full_bucket_fetch(); + default: + return false; + } +} + +} + bool Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) { + // FIXME STRIPE this MUST be in a separate thread to enforce processing in a single thread + // regardless of what RPC thread (comm mgr, FRT...) this is called from! + if (_bucket_db_updater && should_be_handled_by_top_level_bucket_db_updater(*msg)) { + return msg->callHandler(*_bucket_db_updater, msg); + } + // TODO STRIPE can we route both requests and responses that are BucketCommand|Reply based on their bucket alone? + // that covers most operations already... return _stripe->onDown(msg); } bool Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply) { + if (_bucket_db_updater && should_be_handled_by_top_level_bucket_db_updater(*reply)) { + return reply->callHandler(*_bucket_db_updater, reply); + } return _stripe->handleReply(reply); } +// TODO STRIPE we need to reintroduce the top-level message queue... bool Distributor::handleMessage(const std::shared_ptr<api::StorageMessage>& msg) { @@ -245,21 +285,42 @@ Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state) void Distributor::storageDistributionChanged() { - // May happen from any thread. - _stripe->storageDistributionChanged(); + if (_bucket_db_updater) { + if (!_distribution || (*_component.getDistribution() != *_distribution)) { + LOG(debug, "Distribution changed to %s, must re-fetch bucket information", + _component.getDistribution()->toString().c_str()); + _next_distribution = _component.getDistribution(); // FIXME this is not thread safe + } else { + LOG(debug, "Got distribution change, but the distribution %s was the same as before: %s", + _component.getDistribution()->toString().c_str(), + _distribution->toString().c_str()); + } + } else { + // May happen from any thread. + _stripe->storageDistributionChanged(); + } } void Distributor::enableNextDistribution() { - _stripe->enableNextDistribution(); + if (_next_distribution && _bucket_db_updater) { + _distribution = _next_distribution; + _next_distribution = std::shared_ptr<lib::Distribution>(); + auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(_distribution); + _bucket_db_updater->storage_distribution_changed(new_configs); + } else { + _stripe->enableNextDistribution(); + } } // TODO STRIPE only used by tests to directly inject new distribution config +// - actually, also by ctor void Distributor::propagateDefaultDistribution( std::shared_ptr<const lib::Distribution> distribution) { + // TODO STRIPE top-level bucket DB updater _stripe->propagateDefaultDistribution(std::move(distribution)); } @@ -299,6 +360,9 @@ framework::ThreadWaitInfo Distributor::doCriticalTick(framework::ThreadIndex idx) { _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + if (_bucket_db_updater) { + enableNextDistribution(); + } // Propagates any new configs down to stripe(s) enableNextConfig(); _stripe->doCriticalTick(idx); @@ -309,6 +373,9 @@ Distributor::doCriticalTick(framework::ThreadIndex idx) framework::ThreadWaitInfo Distributor::doNonCriticalTick(framework::ThreadIndex idx) { + if (_bucket_db_updater) { + _bucket_db_updater->resend_delayed_messages(); + } // TODO STRIPE stripes need their own thread loops! _stripe->doNonCriticalTick(idx); _tickResult = _stripe->_tickResult; @@ -318,6 +385,11 @@ Distributor::doNonCriticalTick(framework::ThreadIndex idx) void Distributor::enableNextConfig() { + // FIXME STRIPE enforce this cannot happen live, only valid for startup config edge + if (getConfig().num_distributor_stripes() > 0 && !_bucket_db_updater) { + // FIXME STRIPE using the singular stripe here is a temporary Hack McHack Deluxe 3000! + _bucket_db_updater = std::make_unique<BucketDBUpdater>(*_stripe, *_stripe, _comp_reg, *_stripe_accessor); + } _hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting()); _stripe->enableNextConfig(); // TODO STRIPE avoid redundant call } diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index bfffe126b44..9ad6425afe9 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -11,6 +11,7 @@ #include "min_replica_provider.h" #include "pendingmessagetracker.h" #include "statusreporterdelegate.h" +#include "stripe_bucket_db_updater.h" // TODO this is temporary #include <vespa/config/config.h> #include <vespa/storage/common/distributorcomponent.h> #include <vespa/storage/common/doneinitializehandler.h> @@ -33,10 +34,12 @@ namespace storage::distributor { class BlockingOperationStarter; class BucketPriorityDatabase; +class BucketDBUpdater; class DistributorBucketSpaceRepo; class DistributorStatus; class DistributorStripe; class OperationSequencer; +class LegacySingleStripeAccessor; class OwnershipTransferSafeTimePointCalculator; class SimpleMaintenanceScanner; class ThrottlingOperationStarter; @@ -135,8 +138,8 @@ private: bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg); // Accessors used by tests - BucketDBUpdater& bucket_db_updater(); - const BucketDBUpdater& bucket_db_updater() const; + StripeBucketDBUpdater& bucket_db_updater(); + const StripeBucketDBUpdater& bucket_db_updater() const; IdealStateManager& ideal_state_manager(); const IdealStateManager& ideal_state_manager() const; ExternalOperationHandler& external_operation_handler(); @@ -162,16 +165,22 @@ private: void enableNextDistribution(); void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>); + DistributorComponentRegister& _comp_reg; std::shared_ptr<DistributorMetricSet> _metrics; ChainedMessageSender* _messageSender; // TODO STRIPE multiple stripes...! This is for proof of concept of wiring. std::unique_ptr<DistributorStripe> _stripe; + std::unique_ptr<LegacySingleStripeAccessor> _stripe_accessor; storage::DistributorComponent _component; + std::unique_ptr<BucketDBUpdater> _bucket_db_updater; StatusReporterDelegate _distributorStatusDelegate; framework::TickingThreadPool& _threadPool; framework::ThreadWaitInfo _tickResult; MetricUpdateHook _metricUpdateHook; DistributorHostInfoReporter _hostInfoReporter; + + std::shared_ptr<lib::Distribution> _distribution; + std::shared_ptr<lib::Distribution> _next_distribution; }; } diff --git a/storage/src/vespa/storage/distributor/distributor_bucket_space.h b/storage/src/vespa/storage/distributor/distributor_bucket_space.h index f4d5bd6f5aa..558cbada31f 100644 --- a/storage/src/vespa/storage/distributor/distributor_bucket_space.h +++ b/storage/src/vespa/storage/distributor/distributor_bucket_space.h @@ -80,6 +80,7 @@ public: } void set_pending_cluster_state(std::shared_ptr<const lib::ClusterState> pending_cluster_state); + bool has_pending_cluster_state() const noexcept { return static_cast<bool>(_pending_cluster_state); } const lib::ClusterState& get_pending_cluster_state() const noexcept { return *_pending_cluster_state; } /** diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index e41c7940a0d..906129ebf96 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -478,6 +478,8 @@ DistributorStripe::checkBucketForSplit(document::BucketSpace bucketSpace, } } +// TODO STRIPE must only be called when operating in legacy single stripe mode! +// In other cases, distribution config switching is controlled by top-level distributor, not via framework(tm). void DistributorStripe::enableNextDistribution() { @@ -485,10 +487,12 @@ DistributorStripe::enableNextDistribution() _distribution = _nextDistribution; propagateDefaultDistribution(_distribution); _nextDistribution = std::shared_ptr<lib::Distribution>(); + // TODO conditional on whether top-level DB updater is in charge _bucketDBUpdater.storageDistributionChanged(); } } +// TODO STRIPE must be invoked by top-level bucket db updater probably void DistributorStripe::propagateDefaultDistribution( std::shared_ptr<const lib::Distribution> distribution) @@ -500,6 +504,19 @@ DistributorStripe::propagateDefaultDistribution( } } +// Only called when stripe is in rendezvous freeze +void +DistributorStripe::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) { + auto default_distr = new_configs.get_or_nullptr(document::FixedBucketSpaces::default_space()); + auto global_distr = new_configs.get_or_nullptr(document::FixedBucketSpaces::global_space()); + assert(default_distr && global_distr); + + for (auto* repo : {_bucketSpaceRepo.get(), _readOnlyBucketSpaceRepo.get()}) { + repo->get(document::FixedBucketSpaces::default_space()).setDistribution(default_distr); + repo->get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr); + } +} + void DistributorStripe::propagateClusterStates() { diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index 10b3f54d834..4a43b93354e 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -3,7 +3,6 @@ #pragma once #include "bucket_spaces_stats_provider.h" -#include "bucketdbupdater.h" #include "distributor_host_info_reporter.h" #include "distributor_stripe_interface.h" #include "externaloperationhandler.h" @@ -11,6 +10,8 @@ #include "min_replica_provider.h" #include "pendingmessagetracker.h" #include "statusreporterdelegate.h" +#include "stripe_access_guard.h" +#include "stripe_bucket_db_updater.h" #include <vespa/config/config.h> #include <vespa/storage/common/doneinitializehandler.h> #include <vespa/storage/common/messagesender.h> @@ -98,7 +99,7 @@ public: /** * Invoked when a pending cluster state for a distribution (config) - * change has been enabled. An invocation of storageDistributionChanged + * change has been enabled. An invocation of storage_distribution_changed * will eventually cause this method to be called, assuming the pending * cluster state completed successfully. */ @@ -169,8 +170,8 @@ public: return *_bucketIdHasher; } - BucketDBUpdater& bucket_db_updater() { return _bucketDBUpdater; } - const BucketDBUpdater& bucket_db_updater() const { return _bucketDBUpdater; } + StripeBucketDBUpdater& bucket_db_updater() { return _bucketDBUpdater; } + const StripeBucketDBUpdater& bucket_db_updater() const { return _bucketDBUpdater; } IdealStateManager& ideal_state_manager() { return _idealStateManager; } const IdealStateManager& ideal_state_manager() const { return _idealStateManager; } ExternalOperationHandler& external_operation_handler() { return _externalOperationHandler; } @@ -198,6 +199,7 @@ private: friend class DistributorTestUtil; friend class MetricUpdateHook; friend class Distributor; + friend class LegacySingleStripeAccessGuard; bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg); bool isMaintenanceReply(const api::StorageReply& reply) const; @@ -251,9 +253,10 @@ private: bool generateOperation(const std::shared_ptr<api::StorageMessage>& msg, Operation::SP& operation); - void enableNextDistribution(); - void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>); + void enableNextDistribution(); // TODO STRIPE remove once legacy is gone + void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>); // TODO STRIPE remove once legacy is gone void propagateClusterStates(); + void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs); BucketSpacesStatsProvider::BucketSpacesStats make_invalid_stats_per_configured_space() const; template <typename NodeFunctor> @@ -276,7 +279,7 @@ private: std::unique_ptr<OperationSequencer> _operation_sequencer; PendingMessageTracker _pendingMessageTracker; - BucketDBUpdater _bucketDBUpdater; + StripeBucketDBUpdater _bucketDBUpdater; StatusReporterDelegate _distributorStatusDelegate; StatusReporterDelegate _bucketDBStatusDelegate; IdealStateManager _idealStateManager; diff --git a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp new file mode 100644 index 00000000000..04f210205c3 --- /dev/null +++ b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp @@ -0,0 +1,88 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "legacy_single_stripe_accessor.h" +#include "distributor_stripe.h" + +namespace storage::distributor { + +LegacySingleStripeAccessGuard::LegacySingleStripeAccessGuard(LegacySingleStripeAccessor& accessor, + DistributorStripe& stripe) + : _accessor(accessor), + _stripe(stripe) +{} + +LegacySingleStripeAccessGuard::~LegacySingleStripeAccessGuard() { + _accessor.mark_guard_released(); +} + +void LegacySingleStripeAccessGuard::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) { + _stripe.update_distribution_config(new_configs); +} + +void LegacySingleStripeAccessGuard::set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) { + _stripe.getBucketSpaceRepo().set_pending_cluster_state_bundle(pending_state); + // TODO STRIPE also read only repo? +} + +void LegacySingleStripeAccessGuard::clear_pending_cluster_state_bundle() { + _stripe.getBucketSpaceRepo().clear_pending_cluster_state_bundle(); + // TODO STRIPE also read only repo? +} + +void LegacySingleStripeAccessGuard::enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) { + _stripe.enableClusterStateBundle(new_state); +} + +void LegacySingleStripeAccessGuard::notify_distribution_change_enabled() { + _stripe.notifyDistributionChangeEnabled(); +} + +PotentialDataLossReport +LegacySingleStripeAccessGuard::remove_superfluous_buckets(document::BucketSpace bucket_space, + const lib::ClusterState& new_state, + bool is_distribution_change) +{ + return _stripe.bucket_db_updater().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change); +} + +void +LegacySingleStripeAccessGuard::merge_entries_into_db(document::BucketSpace bucket_space, + api::Timestamp gathered_at_timestamp, + const lib::Distribution& distribution, + const lib::ClusterState& new_state, + const char* storage_up_states, + const std::unordered_set<uint16_t>& outdated_nodes, + const std::vector<dbtransition::Entry>& entries) +{ + _stripe.bucket_db_updater().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution, + new_state, storage_up_states, outdated_nodes, entries); +} + +void LegacySingleStripeAccessGuard::update_read_snapshot_before_db_pruning() { + _stripe.bucket_db_updater().update_read_snapshot_before_db_pruning(); +} + +void LegacySingleStripeAccessGuard::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) { + _stripe.bucket_db_updater().update_read_snapshot_after_db_pruning(new_state); +} + +void LegacySingleStripeAccessGuard::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) { + _stripe.bucket_db_updater().update_read_snapshot_after_activation(activated_state); +} + +void LegacySingleStripeAccessGuard::clear_read_only_bucket_repo_databases() { + _stripe.bucket_db_updater().clearReadOnlyBucketRepoDatabases(); +} + +std::unique_ptr<StripeAccessGuard> LegacySingleStripeAccessor::rendezvous_and_hold_all() { + // For sanity checking during development. + assert(!_guard_held); + _guard_held = true; + return std::make_unique<LegacySingleStripeAccessGuard>(*this, _stripe); +} + +void LegacySingleStripeAccessor::mark_guard_released() { + assert(_guard_held); + _guard_held = false; +} + +} diff --git a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h new file mode 100644 index 00000000000..4f8cceb0a70 --- /dev/null +++ b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h @@ -0,0 +1,67 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "stripe_access_guard.h" + +namespace storage::distributor { + +class DistributorStripe; +class LegacySingleStripeAccessor; + +/** + * Very simple stripe access guard which expects the caller and its single stripe to run in the + * same thread. This means there's no actual striping of operations or any thread synchronization + * performed. Only intended as a stop-gap while we have legacy stripe behavior. + */ +class LegacySingleStripeAccessGuard : public StripeAccessGuard { + LegacySingleStripeAccessor& _accessor; + DistributorStripe& _stripe; +public: + LegacySingleStripeAccessGuard(LegacySingleStripeAccessor& accessor, + DistributorStripe& stripe); + ~LegacySingleStripeAccessGuard() override; + + void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override; + void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) override; + void clear_pending_cluster_state_bundle() override; + void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) override; + void notify_distribution_change_enabled() override; + + PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space, + const lib::ClusterState& new_state, + bool is_distribution_change) override; + void merge_entries_into_db(document::BucketSpace bucket_space, + api::Timestamp gathered_at_timestamp, + const lib::Distribution& distribution, + const lib::ClusterState& new_state, + const char* storage_up_states, + const std::unordered_set<uint16_t>& outdated_nodes, + const std::vector<dbtransition::Entry>& entries) override; + + void update_read_snapshot_before_db_pruning() override; + void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) override; + void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) override; + void clear_read_only_bucket_repo_databases() override; +}; + +/** + * Impl of StripeAccessor which creates LegacySingleStripeAccessGuards bound to a single stripe. + */ +class LegacySingleStripeAccessor : public StripeAccessor { + DistributorStripe& _stripe; + bool _guard_held; + + friend class LegacySingleStripeAccessGuard; +public: + explicit LegacySingleStripeAccessor(DistributorStripe& stripe) + : _stripe(stripe), + _guard_held(false) + {} + ~LegacySingleStripeAccessor() override = default; + + std::unique_ptr<StripeAccessGuard> rendezvous_and_hold_all() override; +private: + void mark_guard_released(); +}; + +} diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp index af6223afe28..8e54cfcb27d 100644 --- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp @@ -4,6 +4,7 @@ #include "clusterinformation.h" #include "pendingclusterstate.h" #include "distributor_bucket_space.h" +#include "stripe_access_guard.h" #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vdslib/state/clusterstate.h> #include <algorithm> @@ -18,13 +19,15 @@ using lib::NodeType; using lib::NodeState; PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(const PendingClusterState &pendingClusterState, + document::BucketSpace bucket_space, DistributorBucketSpace &distributorBucketSpace, bool distributionChanged, const OutdatedNodes &outdatedNodes, std::shared_ptr<const ClusterInformation> clusterInfo, const lib::ClusterState &newClusterState, api::Timestamp creationTimestamp) - : _entries(), + : _bucket_space(bucket_space), + _entries(), _iter(0), _removedBuckets(), _missingEntries(), @@ -53,7 +56,7 @@ PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(const PendingClus PendingBucketSpaceDbTransition::~PendingBucketSpaceDbTransition() = default; PendingBucketSpaceDbTransition::Range -PendingBucketSpaceDbTransition::skipAllForSameBucket() +PendingBucketSpaceDbTransition::DbMerger::skipAllForSameBucket() { Range r(_iter, _iter); @@ -68,7 +71,7 @@ PendingBucketSpaceDbTransition::skipAllForSameBucket() } std::vector<BucketCopy> -PendingBucketSpaceDbTransition::getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range) +PendingBucketSpaceDbTransition::DbMerger::getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range) { std::vector<BucketCopy> copiesToAdd; for (uint32_t i = range.first; i < range.second; ++i) { @@ -83,28 +86,21 @@ PendingBucketSpaceDbTransition::getCopiesThatAreNewOrAltered(BucketDatabase::Ent } void -PendingBucketSpaceDbTransition::insertInfo(BucketDatabase::Entry& info, const Range& range) +PendingBucketSpaceDbTransition::DbMerger::insertInfo(BucketDatabase::Entry& info, const Range& range) { std::vector<BucketCopy> copiesToAddOrUpdate( getCopiesThatAreNewOrAltered(info, range)); - const auto &dist(_distributorBucketSpace.getDistribution()); std::vector<uint16_t> order( - dist.getIdealStorageNodes( - _newClusterState, + _distribution.getIdealStorageNodes( + _new_state, _entries[range.first].bucket_id(), - _clusterInfo->getStorageUpStates())); + _storage_up_states)); info->addNodes(copiesToAddOrUpdate, order, TrustedUpdate::DEFER); } -std::string -PendingBucketSpaceDbTransition::requestNodesToString() -{ - return _pendingClusterState.requestNodesToString(); -} - bool -PendingBucketSpaceDbTransition::removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId) +PendingBucketSpaceDbTransition::DbMerger::removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId) { bool updated = false; for (uint32_t i = 0; i < e->getNodeCount();) { @@ -116,7 +112,7 @@ PendingBucketSpaceDbTransition::removeCopiesFromNodesThatWereRequested(BucketDat // mark a single remaining replica as trusted even though there might // be one or more additional replicas pending merge into the database. if (nodeIsOutdated(entryNode) - && (info.getTimestamp() < _creationTimestamp) + && (info.getTimestamp() < _creation_timestamp) && e->removeNode(entryNode, TrustedUpdate::DEFER)) { LOG(spam, @@ -133,21 +129,21 @@ PendingBucketSpaceDbTransition::removeCopiesFromNodesThatWereRequested(BucketDat } bool -PendingBucketSpaceDbTransition::databaseIteratorHasPassedBucketInfoIterator(uint64_t bucket_key) const +PendingBucketSpaceDbTransition::DbMerger::databaseIteratorHasPassedBucketInfoIterator(uint64_t bucket_key) const { return ((_iter < _entries.size()) && (_entries[_iter].bucket_key < bucket_key)); } bool -PendingBucketSpaceDbTransition::bucketInfoIteratorPointsToBucket(uint64_t bucket_key) const +PendingBucketSpaceDbTransition::DbMerger::bucketInfoIteratorPointsToBucket(uint64_t bucket_key) const { return _iter < _entries.size() && _entries[_iter].bucket_key == bucket_key; } using MergeResult = BucketDatabase::MergingProcessor::Result; -MergeResult PendingBucketSpaceDbTransition::merge(BucketDatabase::Merger& merger) { +MergeResult PendingBucketSpaceDbTransition::DbMerger::merge(BucketDatabase::Merger& merger) { const uint64_t bucket_key = merger.bucket_key(); while (databaseIteratorHasPassedBucketInfoIterator(bucket_key)) { @@ -158,9 +154,7 @@ MergeResult PendingBucketSpaceDbTransition::merge(BucketDatabase::Merger& merger auto& e = merger.current_entry(); document::BucketId bucketId(e.getBucketId()); - LOG(spam, - "Before merging info from nodes [%s], bucket %s had info %s", - requestNodesToString().c_str(), + LOG(spam, "Before merging info, bucket %s had info %s", bucketId.toString().c_str(), e.getBucketInfo().toString().c_str()); @@ -185,14 +179,14 @@ MergeResult PendingBucketSpaceDbTransition::merge(BucketDatabase::Merger& merger return MergeResult::KeepUnchanged; } -void PendingBucketSpaceDbTransition::insert_remaining_at_end(BucketDatabase::TrailingInserter& inserter) { +void PendingBucketSpaceDbTransition::DbMerger::insert_remaining_at_end(BucketDatabase::TrailingInserter& inserter) { while (_iter < _entries.size()) { addToInserter(inserter, skipAllForSameBucket()); } } void -PendingBucketSpaceDbTransition::addToMerger(BucketDatabase::Merger& merger, const Range& range) +PendingBucketSpaceDbTransition::DbMerger::addToMerger(BucketDatabase::Merger& merger, const Range& range) { const auto bucket_id = _entries[range.first].bucket_id(); LOG(spam, "Adding new bucket %s with %d copies", @@ -202,16 +196,14 @@ PendingBucketSpaceDbTransition::addToMerger(BucketDatabase::Merger& merger, cons BucketDatabase::Entry e(bucket_id, BucketInfo()); insertInfo(e, range); if (e->getLastGarbageCollectionTime() == 0) { - e->setLastGarbageCollectionTime( - framework::MicroSecTime(_creationTimestamp) - .getSeconds().getTime()); + e->setLastGarbageCollectionTime(framework::MicroSecTime(_creation_timestamp).getSeconds().getTime()); } e.getBucketInfo().updateTrusted(); merger.insert_before_current(bucket_id, e); } void -PendingBucketSpaceDbTransition::addToInserter(BucketDatabase::TrailingInserter& inserter, const Range& range) +PendingBucketSpaceDbTransition::DbMerger::addToInserter(BucketDatabase::TrailingInserter& inserter, const Range& range) { // TODO dedupe const auto bucket_id = _entries[range.first].bucket_id(); @@ -222,20 +214,32 @@ PendingBucketSpaceDbTransition::addToInserter(BucketDatabase::TrailingInserter& BucketDatabase::Entry e(bucket_id, BucketInfo()); insertInfo(e, range); if (e->getLastGarbageCollectionTime() == 0) { - e->setLastGarbageCollectionTime( - framework::MicroSecTime(_creationTimestamp) - .getSeconds().getTime()); + e->setLastGarbageCollectionTime(framework::MicroSecTime(_creation_timestamp).getSeconds().getTime()); } e.getBucketInfo().updateTrusted(); inserter.insert_at_end(bucket_id, e); } +// TODO STRIPE remove legacy single stripe stuff void PendingBucketSpaceDbTransition::mergeIntoBucketDatabase() { BucketDatabase &db(_distributorBucketSpace.getBucketDatabase()); std::sort(_entries.begin(), _entries.end()); - db.merge(*this); + + const auto& dist = _distributorBucketSpace.getDistribution(); + DbMerger merger(_creationTimestamp, dist, _newClusterState, _clusterInfo->getStorageUpStates(), _outdatedNodes, _entries); + + db.merge(merger); +} + +void +PendingBucketSpaceDbTransition::merge_into_bucket_databases(StripeAccessGuard& guard) +{ + std::sort(_entries.begin(), _entries.end()); + const auto& dist = _distributorBucketSpace.getDistribution(); + guard.merge_entries_into_db(_bucket_space, _creationTimestamp, dist, _newClusterState, + _clusterInfo->getStorageUpStates(), _outdatedNodes, _entries); } void diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h index 232f1186879..86d2c89bf23 100644 --- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h @@ -3,25 +3,30 @@ #include "pending_bucket_space_db_transition_entry.h" #include "outdated_nodes.h" +#include <vespa/document/bucket/bucketspace.h> #include <vespa/storage/bucketdb/bucketdatabase.h> #include <unordered_map> namespace storage::api { class RequestBucketInfoReply; } -namespace storage::lib { class ClusterState; class State; } +namespace storage::lib { +class ClusterState; +class Distribution; +class State; +} namespace storage::distributor { class ClusterInformation; class PendingClusterState; class DistributorBucketSpace; +class StripeAccessGuard; /** * Class used by PendingClusterState to track request bucket info * reply result within a bucket space and apply it to the distributor * bucket database when switching to the pending cluster state. */ -class PendingBucketSpaceDbTransition : public BucketDatabase::MergingProcessor -{ +class PendingBucketSpaceDbTransition { public: using Entry = dbtransition::Entry; using EntryList = std::vector<Entry>; @@ -29,6 +34,7 @@ public: private: using Range = std::pair<uint32_t, uint32_t>; + document::BucketSpace _bucket_space; EntryList _entries; uint32_t _iter; std::vector<document::BucketId> _removedBuckets; @@ -42,45 +48,16 @@ private: // may be down and thus cannot get a request. OutdatedNodes _outdatedNodes; - const lib::ClusterState &_prevClusterState; - const lib::ClusterState &_newClusterState; + const lib::ClusterState& _prevClusterState; + const lib::ClusterState& _newClusterState; const api::Timestamp _creationTimestamp; - const PendingClusterState &_pendingClusterState; - DistributorBucketSpace &_distributorBucketSpace; + const PendingClusterState& _pendingClusterState; + DistributorBucketSpace& _distributorBucketSpace; uint16_t _distributorIndex; bool _bucketOwnershipTransfer; std::unordered_map<uint16_t, size_t> _rejectedRequests; std::unordered_map<uint16_t, size_t> _failed_requests; // Also includes rejections - BucketDatabase::MergingProcessor::Result merge(BucketDatabase::Merger&) override; - void insert_remaining_at_end(BucketDatabase::TrailingInserter&) override; - - /** - * Skips through all entries for the same bucket and returns - * the range in the entry list for which they were found. - * The range is [from, to> - */ - Range skipAllForSameBucket(); - - std::vector<BucketCopy> getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range); - void insertInfo(BucketDatabase::Entry& info, const Range& range); - void addToMerger(BucketDatabase::Merger& merger, const Range& range); - void addToInserter(BucketDatabase::TrailingInserter& inserter, const Range& range); - - bool nodeIsOutdated(uint16_t node) const { - return (_outdatedNodes.find(node) != _outdatedNodes.end()); - } - - // Returns whether at least one replica was removed from the entry. - // Does NOT implicitly update trusted status on remaining replicas; caller must do - // this explicitly. - bool removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId); - - // Helper methods for iterating over _entries - bool databaseIteratorHasPassedBucketInfoIterator(uint64_t bucket_key) const; - bool bucketInfoIteratorPointsToBucket(uint64_t bucket_key) const; - std::string requestNodesToString(); - bool distributorChanged(); static bool nodeWasUpButNowIsDown(const lib::State &old, const lib::State &nw); bool storageNodeUpInNewState(uint16_t node) const; @@ -94,17 +71,75 @@ private: void updateSetOfNodesThatAreOutdated(); public: + // Abstracts away the details of how an entry list gathered from content nodes + // is actually diffed and merged into a database. + class DbMerger : public BucketDatabase::MergingProcessor { + api::Timestamp _creation_timestamp; + const lib::Distribution& _distribution; + const lib::ClusterState& _new_state; + const char* _storage_up_states; + const std::unordered_set<uint16_t>& _outdated_nodes; // TODO hash_set + const std::vector<dbtransition::Entry>& _entries; + uint32_t _iter; + public: + DbMerger(api::Timestamp creation_timestamp, + const lib::Distribution& distribution, + const lib::ClusterState& new_state, + const char* storage_up_states, + const std::unordered_set<uint16_t>& outdated_nodes, + const std::vector<dbtransition::Entry>& entries) + : _creation_timestamp(creation_timestamp), + _distribution(distribution), + _new_state(new_state), + _storage_up_states(storage_up_states), + _outdated_nodes(outdated_nodes), + _entries(entries), + _iter(0) + {} + ~DbMerger() override = default; + + BucketDatabase::MergingProcessor::Result merge(BucketDatabase::Merger&) override; + void insert_remaining_at_end(BucketDatabase::TrailingInserter&) override; + + /** + * Skips through all entries for the same bucket and returns + * the range in the entry list for which they were found. + * The range is [from, to> + */ + Range skipAllForSameBucket(); + + std::vector<BucketCopy> getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range); + void insertInfo(BucketDatabase::Entry& info, const Range& range); + void addToMerger(BucketDatabase::Merger& merger, const Range& range); + void addToInserter(BucketDatabase::TrailingInserter& inserter, const Range& range); + + // Returns whether at least one replica was removed from the entry. + // Does NOT implicitly update trusted status on remaining replicas; caller must do + // this explicitly. + bool removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId); + + // Helper methods for iterating over _entries + bool databaseIteratorHasPassedBucketInfoIterator(uint64_t bucket_key) const; + bool bucketInfoIteratorPointsToBucket(uint64_t bucket_key) const; + + bool nodeIsOutdated(uint16_t node) const { + return (_outdated_nodes.find(node) != _outdated_nodes.end()); + } + }; + PendingBucketSpaceDbTransition(const PendingClusterState &pendingClusterState, + document::BucketSpace bucket_space, DistributorBucketSpace &distributorBucketSpace, bool distributionChanged, const OutdatedNodes &outdatedNodes, std::shared_ptr<const ClusterInformation> clusterInfo, const lib::ClusterState &newClusterState, api::Timestamp creationTimestamp); - ~PendingBucketSpaceDbTransition() override; + ~PendingBucketSpaceDbTransition(); // Merges all the results with the corresponding bucket database. void mergeIntoBucketDatabase(); + void merge_into_bucket_databases(StripeAccessGuard& guard); // Adds the info from the reply to our list of information. void onRequestBucketInfoReply(const api::RequestBucketInfoReply &reply, uint16_t node); diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index a7fd5a5af53..9e811ea9d23 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -84,9 +84,9 @@ PendingClusterState::initializeBucketSpaceTransitions(bool distributionChanged, auto onItr = outdatedNodesMap.find(elem.first); const auto &outdatedNodes = (onItr == outdatedNodesMap.end()) ? emptyOutdatedNodes : onItr->second; auto pendingTransition = - std::make_unique<PendingBucketSpaceDbTransition> - (*this, *elem.second, distributionChanged, outdatedNodes, - _clusterInfo, *_newClusterStateBundle.getDerivedClusterState(elem.first), _creationTimestamp); + std::make_unique<PendingBucketSpaceDbTransition>( + *this, elem.first, *elem.second, distributionChanged, outdatedNodes, + _clusterInfo, *_newClusterStateBundle.getDerivedClusterState(elem.first), _creationTimestamp); if (pendingTransition->getBucketOwnershipTransfer()) { _bucketOwnershipTransfer = true; } @@ -331,6 +331,14 @@ PendingClusterState::mergeIntoBucketDatabases() } void +PendingClusterState::merge_into_bucket_databases(StripeAccessGuard& guard) +{ + for (auto &elem : _pendingTransitions) { + elem.second->merge_into_bucket_databases(guard); + } +} + +void PendingClusterState::printXml(vespalib::XmlOutputStream& xos) const { using namespace vespalib::xml; diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h index 42b7bf0dcf2..af0c85fab95 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.h +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h @@ -18,6 +18,7 @@ namespace storage::distributor { class DistributorMessageSender; class PendingBucketSpaceDbTransition; class DistributorBucketSpaceRepo; +class StripeAccessGuard; /** * Class used by BucketDBUpdater to track request bucket info @@ -146,6 +147,8 @@ public: * Merges all the results with the corresponding bucket databases. */ void mergeIntoBucketDatabases(); + void merge_into_bucket_databases(StripeAccessGuard& guard); + // Get pending transition for a specific bucket space. Only used by unit test. PendingBucketSpaceDbTransition &getPendingBucketSpaceDbTransition(document::BucketSpace bucketSpace); diff --git a/storage/src/vespa/storage/distributor/potential_data_loss_report.h b/storage/src/vespa/storage/distributor/potential_data_loss_report.h new file mode 100644 index 00000000000..96abd787649 --- /dev/null +++ b/storage/src/vespa/storage/distributor/potential_data_loss_report.h @@ -0,0 +1,22 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <cstdint> + +namespace storage::distributor { + +/** + * Represents the amount of data a distributor reasons _may_ have become unavailable + * due to all bucket replicas no longer being present. + */ +struct PotentialDataLossReport { + size_t buckets = 0; + size_t documents = 0; + + void merge(const PotentialDataLossReport& other) noexcept { + buckets += other.buckets; + documents += other.documents; + } +}; + +} diff --git a/storage/src/vespa/storage/distributor/stripe_access_guard.h b/storage/src/vespa/storage/distributor/stripe_access_guard.h new file mode 100644 index 00000000000..8d848450cd2 --- /dev/null +++ b/storage/src/vespa/storage/distributor/stripe_access_guard.h @@ -0,0 +1,68 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "bucket_space_distribution_configs.h" +#include "pending_bucket_space_db_transition_entry.h" +#include "potential_data_loss_report.h" +#include <vespa/document/bucket/bucketspace.h> +#include <vespa/storageapi/defs.h> +#include <unordered_set> // TODO use hash_set instead + +namespace storage::lib { +class ClusterState; +class ClusterStateBundle; +class Distribution; +} + +namespace storage::distributor { + + +/** + * A stripe access guard guarantees that the holder of a guard can access underlying + * stripes via it in a thread safe manner. In particular, while any access guard is + * held, all stripe threads must be in a safe rendezvous location with no race conditions + * possible. When a guard goes out of scope, the stripe threads may resume operation. + */ +class StripeAccessGuard { +public: + virtual ~StripeAccessGuard() = default; + + virtual void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) = 0; + virtual void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) = 0; + virtual void clear_pending_cluster_state_bundle() = 0; + virtual void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) = 0; + virtual void notify_distribution_change_enabled() = 0; + + virtual PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space, + const lib::ClusterState& new_state, + bool is_distribution_change) = 0; + virtual void merge_entries_into_db(document::BucketSpace bucket_space, + api::Timestamp gathered_at_timestamp, + const lib::Distribution& distribution, + const lib::ClusterState& new_state, + const char* storage_up_states, + const std::unordered_set<uint16_t>& outdated_nodes, + const std::vector<dbtransition::Entry>& entries) = 0; + + virtual void update_read_snapshot_before_db_pruning() = 0; + virtual void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) = 0; + virtual void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) = 0; + virtual void clear_read_only_bucket_repo_databases() = 0; + +}; + +/** + * Provides a factory for guards that protect access to underlying stripes. + * + * Important: at most one StripeAccessorGuard may exist at any given time. Creating + * concurrent guards is undefined behavior. + */ +class StripeAccessor { +public: + virtual ~StripeAccessor() = default; + + virtual std::unique_ptr<StripeAccessGuard> rendezvous_and_hold_all() = 0; + // TODO also accessor for a single particular stripe? +}; + +} diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp new file mode 100644 index 00000000000..cf1781e0ae8 --- /dev/null +++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp @@ -0,0 +1,1132 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "stripe_bucket_db_updater.h" +#include "bucket_db_prune_elision.h" +#include "bucket_space_distribution_context.h" +#include "distributor.h" +#include "distributor_bucket_space.h" +#include "distributormetricsset.h" +#include "pending_bucket_space_db_transition.h" +#include "potential_data_loss_report.h" +#include "simpleclusterinformation.h" +#include "stripe_access_guard.h" +#include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/removelocation.h> +#include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vdslib/state/clusterstate.h> +#include <vespa/vespalib/util/xmlstream.h> +#include <thread> + +#include <vespa/log/bufferedlogger.h> +LOG_SETUP(".distributor.stripe_bucket_db_updater"); + +using storage::lib::Node; +using storage::lib::NodeType; +using document::BucketSpace; + +namespace storage::distributor { + +StripeBucketDBUpdater::StripeBucketDBUpdater(DistributorStripeInterface& owner, + DistributorBucketSpaceRepo& bucketSpaceRepo, + DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, + DistributorMessageSender& sender, + DistributorComponentRegister& compReg) + : framework::StatusReporter("bucketdb", "Bucket DB Updater"), + _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"), + _node_ctx(_distributorComponent), + _op_ctx(_distributorComponent), + _distributor_interface(_distributorComponent.getDistributor()), + _delayedRequests(), + _sentMessages(), + _pendingClusterState(), + _history(), + _sender(sender), + _enqueuedRechecks(), + _outdatedNodesMap(), + _transitionTimer(_node_ctx.clock()), + _stale_reads_enabled(false), + _active_distribution_contexts(), + _explicit_transition_read_guard(), + _distribution_context_mutex() +{ + for (auto& elem : _op_ctx.bucket_space_repo()) { + _active_distribution_contexts.emplace( + elem.first, + BucketSpaceDistributionContext::make_not_yet_initialized(_node_ctx.node_index())); + _explicit_transition_read_guard.emplace(elem.first, std::shared_ptr<BucketDatabase::ReadGuard>()); + } +} + +StripeBucketDBUpdater::~StripeBucketDBUpdater() = default; + +OperationRoutingSnapshot StripeBucketDBUpdater::read_snapshot_for_bucket(const document::Bucket& bucket) const { + const auto bucket_space = bucket.getBucketSpace(); + std::lock_guard lock(_distribution_context_mutex); + auto active_state_iter = _active_distribution_contexts.find(bucket_space); + assert(active_state_iter != _active_distribution_contexts.cend()); + auto& state = *active_state_iter->second; + if (!state.bucket_owned_in_active_state(bucket.getBucketId())) { + return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second); + } + const bool bucket_present_in_mutable_db = state.bucket_owned_in_pending_state(bucket.getBucketId()); + if (!bucket_present_in_mutable_db && !stale_reads_enabled()) { + return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second); + } + const auto& space_repo = bucket_present_in_mutable_db + ? _op_ctx.bucket_space_repo() + : _op_ctx.read_only_bucket_space_repo(); + auto existing_guard_iter = _explicit_transition_read_guard.find(bucket_space); + assert(existing_guard_iter != _explicit_transition_read_guard.cend()); + auto db_guard = existing_guard_iter->second + ? existing_guard_iter-> second + : space_repo.get(bucket_space).getBucketDatabase().acquire_read_guard(); + return OperationRoutingSnapshot::make_routable_with_guard(active_state_iter->second, std::move(db_guard), space_repo); +} + +void +StripeBucketDBUpdater::flush() +{ + for (auto & entry : _sentMessages) { + // Cannot sendDown MergeBucketReplies during flushing, since + // all lower links have been closed + if (entry.second._mergeReplyGuard) { + entry.second._mergeReplyGuard->resetReply(); + } + } + _sentMessages.clear(); +} + +void +StripeBucketDBUpdater::print(std::ostream& out, bool verbose, const std::string& indent) const +{ + (void) verbose; (void) indent; + out << "BucketDBUpdater"; +} + +bool +StripeBucketDBUpdater::shouldDeferStateEnabling() const noexcept +{ + return stale_reads_enabled(); +} + +bool +StripeBucketDBUpdater::hasPendingClusterState() const +{ + // Defer to the repo instead of checking our own internal pending cluster state, + // as we won't have one if the top level distributor handles this for all stripes. + // But if we're operating in "legacy" mode with this stripe bucket DB updater as + // the authoritative source, there should always be an internal pending cluster + // state if the repo is tagged as having one as well. + // Since we also set a pending cluster state bundle when triggered by a distribution + // config change, this check also covers that case. + return _op_ctx.bucket_space_repo().get(document::FixedBucketSpaces::default_space()).has_pending_cluster_state(); +} + +const lib::ClusterState* +StripeBucketDBUpdater::pendingClusterStateOrNull(const document::BucketSpace& space) const { + auto& distr_space = _op_ctx.bucket_space_repo().get(space); + return (distr_space.has_pending_cluster_state() + ? &distr_space.get_pending_cluster_state() + : nullptr); +} + +void +StripeBucketDBUpdater::sendRequestBucketInfo( + uint16_t node, + const document::Bucket& bucket, + const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard) +{ + if (!_op_ctx.storage_node_is_up(bucket.getBucketSpace(), node)) { + return; + } + + std::vector<document::BucketId> buckets; + buckets.push_back(bucket.getBucketId()); + + auto msg = std::make_shared<api::RequestBucketInfoCommand>(bucket.getBucketSpace(), buckets); + + LOG(debug, + "Sending request bucket info command %" PRIu64 " for " + "bucket %s to node %u", + msg->getMsgId(), + bucket.toString().c_str(), + node); + + msg->setPriority(50); + msg->setAddress(_node_ctx.node_address(node)); + + _sentMessages[msg->getMsgId()] = + BucketRequest(node, _op_ctx.generate_unique_timestamp(), + bucket, mergeReplyGuard); + _sender.sendCommand(msg); +} + +void +StripeBucketDBUpdater::recheckBucketInfo(uint32_t nodeIdx, + const document::Bucket& bucket) +{ + sendRequestBucketInfo(nodeIdx, bucket, std::shared_ptr<MergeReplyGuard>()); +} + +namespace { + +class ReadOnlyDbMergingInserter : public BucketDatabase::MergingProcessor { + using NewEntries = std::vector<BucketDatabase::Entry>; + NewEntries::const_iterator _current; + const NewEntries::const_iterator _last; +public: + explicit ReadOnlyDbMergingInserter(const NewEntries& new_entries) + : _current(new_entries.cbegin()), + _last(new_entries.cend()) + {} + + Result merge(BucketDatabase::Merger& m) override { + const uint64_t key_to_insert = m.bucket_key(); + uint64_t key_at_cursor = 0; + while (_current != _last) { + key_at_cursor = _current->getBucketId().toKey(); + if (key_at_cursor >= key_to_insert) { + break; + } + m.insert_before_current(_current->getBucketId(), *_current); + ++_current; + } + if ((_current != _last) && (key_at_cursor == key_to_insert)) { + // If we encounter a bucket that already exists, replace value wholesale. + // Don't try to cleverly merge replicas, as the values we currently hold + // in the read-only DB may be stale. + // Note that this case shouldn't really happen, since we only add previously + // owned buckets to the read-only DB, and subsequent adds to a non-empty DB + // can only happen for state preemptions. Since ownership is not regained + // before a state is stable, a bucket is only added once. But we handle it + // anyway in case this changes at some point in the future. + m.current_entry() = *_current; + return Result::Update; + } + return Result::KeepUnchanged; + } + + void insert_remaining_at_end(BucketDatabase::TrailingInserter& inserter) override { + for (; _current != _last; ++_current) { + inserter.insert_at_end(_current->getBucketId(), *_current); + } + } +}; + +} + +void +StripeBucketDBUpdater::removeSuperfluousBuckets( + const lib::ClusterStateBundle& newState, + bool is_distribution_config_change) +{ + const bool move_to_read_only_db = shouldDeferStateEnabling(); + const char* up_states = _op_ctx.storage_node_up_states(); + for (auto& elem : _op_ctx.bucket_space_repo()) { + const auto& newDistribution(elem.second->getDistribution()); + const auto& oldClusterState(elem.second->getClusterState()); + const auto& new_cluster_state = newState.getDerivedClusterState(elem.first); + + // Running a full DB sweep is expensive, so if the cluster state transition does + // not actually indicate that buckets should possibly be removed, we elide it entirely. + if (!is_distribution_config_change + && db_pruning_may_be_elided(oldClusterState, *new_cluster_state, up_states)) + { + LOG(debug, "[bucket space '%s']: eliding DB pruning for state transition '%s' -> '%s'", + document::FixedBucketSpaces::to_string(elem.first).data(), + oldClusterState.toString().c_str(), new_cluster_state->toString().c_str()); + continue; + } + + auto& bucketDb(elem.second->getBucketDatabase()); + auto& readOnlyDb(_op_ctx.read_only_bucket_space_repo().get(elem.first).getBucketDatabase()); + + // Remove all buckets not belonging to this distributor, or + // being on storage nodes that are no longer up. + MergingNodeRemover proc( + oldClusterState, + *new_cluster_state, + _node_ctx.node_index(), + newDistribution, + up_states, + move_to_read_only_db); + + bucketDb.merge(proc); + if (move_to_read_only_db) { + ReadOnlyDbMergingInserter read_only_merger(proc.getNonOwnedEntries()); + readOnlyDb.merge(read_only_merger); + } + maybe_inject_simulated_db_pruning_delay(); + } +} + +PotentialDataLossReport +StripeBucketDBUpdater::remove_superfluous_buckets( + document::BucketSpace bucket_space, + const lib::ClusterState& new_state, + bool is_distribution_change) +{ + (void)is_distribution_change; // TODO remove if not needed + const bool move_to_read_only_db = shouldDeferStateEnabling(); + const char* up_states = _op_ctx.storage_node_up_states(); + + auto& s = _op_ctx.bucket_space_repo().get(bucket_space); + const auto& new_distribution = s.getDistribution(); + const auto& old_cluster_state = s.getClusterState(); + // Elision of DB sweep is done at a higher level, so we don't have to do that here. + auto& bucket_db = s.getBucketDatabase(); + auto& read_only_db = _op_ctx.read_only_bucket_space_repo().get(bucket_space).getBucketDatabase(); + + // Remove all buckets not belonging to this distributor, or + // being on storage nodes that are no longer up. + MergingNodeRemover proc( + old_cluster_state, + new_state, + _node_ctx.node_index(), + new_distribution, + up_states, + move_to_read_only_db); + + bucket_db.merge(proc); + if (move_to_read_only_db) { + ReadOnlyDbMergingInserter read_only_merger(proc.getNonOwnedEntries()); + read_only_db.merge(read_only_merger); + } + PotentialDataLossReport report; + report.buckets = proc.removed_buckets(); + report.documents = proc.removed_documents(); + return report; +} + +void +StripeBucketDBUpdater::merge_entries_into_db(document::BucketSpace bucket_space, + api::Timestamp gathered_at_timestamp, + const lib::Distribution& distribution, + const lib::ClusterState& new_state, + const char* storage_up_states, + const std::unordered_set<uint16_t>& outdated_nodes, + const std::vector<dbtransition::Entry>& entries) +{ + auto& s = _op_ctx.bucket_space_repo().get(bucket_space); + auto& bucket_db = s.getBucketDatabase(); + + PendingBucketSpaceDbTransition::DbMerger merger(gathered_at_timestamp, distribution, new_state, + storage_up_states, outdated_nodes, entries); + bucket_db.merge(merger); +} + +namespace { + +void maybe_sleep_for(std::chrono::milliseconds ms) { + if (ms.count() > 0) { + std::this_thread::sleep_for(ms); + } +} + +} + +void +StripeBucketDBUpdater::maybe_inject_simulated_db_pruning_delay() { + maybe_sleep_for(_op_ctx.distributor_config().simulated_db_pruning_latency()); +} + +void +StripeBucketDBUpdater::maybe_inject_simulated_db_merging_delay() { + maybe_sleep_for(_op_ctx.distributor_config().simulated_db_merging_latency()); +} + +void +StripeBucketDBUpdater::ensureTransitionTimerStarted() +{ + // Don't overwrite start time if we're already processing a state, as + // that will make transition times appear artificially low. + if (!hasPendingClusterState()) { + _transitionTimer = framework::MilliSecTimer( + _node_ctx.clock()); + } +} + +void +StripeBucketDBUpdater::completeTransitionTimer() +{ + _distributor_interface.getMetrics() + .stateTransitionTime.addValue(_transitionTimer.getElapsedTimeAsDouble()); +} + +void +StripeBucketDBUpdater::clearReadOnlyBucketRepoDatabases() +{ + for (auto& space : _op_ctx.read_only_bucket_space_repo()) { + space.second->getBucketDatabase().clear(); + } +} + +void +StripeBucketDBUpdater::storageDistributionChanged() +{ + ensureTransitionTimerStarted(); + + removeSuperfluousBuckets(_op_ctx.cluster_state_bundle(), true); + + auto clusterInfo = std::make_shared<const SimpleClusterInformation>( + _node_ctx.node_index(), + _op_ctx.cluster_state_bundle(), + _op_ctx.storage_node_up_states()); + _pendingClusterState = PendingClusterState::createForDistributionChange( + _node_ctx.clock(), + std::move(clusterInfo), + _sender, + _op_ctx.bucket_space_repo(), + _op_ctx.generate_unique_timestamp()); + _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); + _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle()); +} + +void +StripeBucketDBUpdater::replyToPreviousPendingClusterStateIfAny() +{ + if (_pendingClusterState.get() && _pendingClusterState->hasCommand()) { + _distributor_interface.getMessageSender().sendUp( + std::make_shared<api::SetSystemStateReply>(*_pendingClusterState->getCommand())); + } +} + +void +StripeBucketDBUpdater::replyToActivationWithActualVersion( + const api::ActivateClusterStateVersionCommand& cmd, + uint32_t actualVersion) +{ + auto reply = std::make_shared<api::ActivateClusterStateVersionReply>(cmd); + reply->setActualVersion(actualVersion); + _distributor_interface.getMessageSender().sendUp(reply); // TODO let API accept rvalues +} + +void StripeBucketDBUpdater::update_read_snapshot_before_db_pruning() { + std::lock_guard lock(_distribution_context_mutex); + for (auto& elem : _op_ctx.bucket_space_repo()) { + // At this point, we're still operating with a distribution context _without_ a + // pending state, i.e. anyone using the context will expect to find buckets + // in the DB that correspond to how the database looked like prior to pruning + // buckets from the DB. To ensure this is not violated, take a snapshot of the + // _mutable_ DB and expose this. This snapshot only lives until we atomically + // flip to expose a distribution context that includes the new, pending state. + // At that point, the read-only DB is known to contain the buckets that have + // been pruned away, so we can release the mutable DB snapshot safely. + // TODO test for, and handle, state preemption case! + _explicit_transition_read_guard[elem.first] = elem.second->getBucketDatabase().acquire_read_guard(); + } +} + + +void StripeBucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) { + std::lock_guard lock(_distribution_context_mutex); + const auto old_default_state = _op_ctx.bucket_space_repo().get( + document::FixedBucketSpaces::default_space()).cluster_state_sp(); + for (auto& elem : _op_ctx.bucket_space_repo()) { + auto new_distribution = elem.second->distribution_sp(); + auto old_cluster_state = elem.second->cluster_state_sp(); + auto new_cluster_state = new_state.getDerivedClusterState(elem.first); + _active_distribution_contexts.insert_or_assign( + elem.first, + BucketSpaceDistributionContext::make_state_transition( + std::move(old_cluster_state), + old_default_state, + std::move(new_cluster_state), + std::move(new_distribution), + _node_ctx.node_index())); + // We can now remove the explicit mutable DB snapshot, as the buckets that have been + // pruned away are visible in the read-only DB. + _explicit_transition_read_guard[elem.first] = std::shared_ptr<BucketDatabase::ReadGuard>(); + } +} + +void StripeBucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) { + std::lock_guard lock(_distribution_context_mutex); + const auto& default_cluster_state = activated_state.getDerivedClusterState(document::FixedBucketSpaces::default_space()); + for (auto& elem : _op_ctx.bucket_space_repo()) { + auto new_distribution = elem.second->distribution_sp(); + auto new_cluster_state = activated_state.getDerivedClusterState(elem.first); + _active_distribution_contexts.insert_or_assign( + elem.first, + BucketSpaceDistributionContext::make_stable_state( + std::move(new_cluster_state), + default_cluster_state, + std::move(new_distribution), + _node_ctx.node_index())); + } +} + +bool +StripeBucketDBUpdater::onSetSystemState( + const std::shared_ptr<api::SetSystemStateCommand>& cmd) +{ + LOG(debug, + "Received new cluster state %s", + cmd->getSystemState().toString().c_str()); + + const lib::ClusterStateBundle oldState = _op_ctx.cluster_state_bundle(); + const lib::ClusterStateBundle& state = cmd->getClusterStateBundle(); + + if (state == oldState) { + return false; + } + ensureTransitionTimerStarted(); + // Separate timer since _transition_timer might span multiple pending states. + framework::MilliSecTimer process_timer(_node_ctx.clock()); + update_read_snapshot_before_db_pruning(); + const auto& bundle = cmd->getClusterStateBundle(); + removeSuperfluousBuckets(bundle, false); + update_read_snapshot_after_db_pruning(bundle); + replyToPreviousPendingClusterStateIfAny(); + + auto clusterInfo = std::make_shared<const SimpleClusterInformation>( + _node_ctx.node_index(), + _op_ctx.cluster_state_bundle(), + _op_ctx.storage_node_up_states()); + _pendingClusterState = PendingClusterState::createForClusterStateChange( + _node_ctx.clock(), + std::move(clusterInfo), + _sender, + _op_ctx.bucket_space_repo(), + cmd, + _outdatedNodesMap, + _op_ctx.generate_unique_timestamp()); + _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); + + _distributor_interface.getMetrics().set_cluster_state_processing_time.addValue( + process_timer.getElapsedTimeAsDouble()); + + _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle()); + if (isPendingClusterStateCompleted()) { + processCompletedPendingClusterState(); + } + return true; +} + +bool +StripeBucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) +{ + if (hasPendingClusterState() && _pendingClusterState->isVersionedTransition()) { + const auto pending_version = _pendingClusterState->clusterStateVersion(); + if (pending_version == cmd->version()) { + if (isPendingClusterStateCompleted()) { + assert(_pendingClusterState->isDeferred()); + activatePendingClusterState(); + } else { + LOG(error, "Received cluster state activation for pending version %u " + "without pending state being complete yet. This is not expected, " + "as no activation should be sent before all distributors have " + "reported that state processing is complete.", pending_version); + replyToActivationWithActualVersion(*cmd, 0); // Invalid version, will cause re-send (hopefully when completed). + return true; + } + } else { + replyToActivationWithActualVersion(*cmd, pending_version); + return true; + } + } else if (shouldDeferStateEnabling()) { + // Likely just a resend, but log warn for now to get a feel of how common it is. + LOG(warning, "Received cluster state activation command for version %u, which " + "has no corresponding pending state. Likely resent operation.", cmd->version()); + } else { + LOG(debug, "Received cluster state activation command for version %u, but distributor " + "config does not have deferred activation enabled. Treating as no-op.", cmd->version()); + } + // Fall through to next link in call chain that cares about this message. + return false; +} + +StripeBucketDBUpdater::MergeReplyGuard::~MergeReplyGuard() +{ + if (_reply) { + _distributor_interface.handleCompletedMerge(_reply); + } +} + +bool +StripeBucketDBUpdater::onMergeBucketReply( + const std::shared_ptr<api::MergeBucketReply>& reply) +{ + auto replyGuard = std::make_shared<MergeReplyGuard>(_distributor_interface, reply); + + // In case the merge was unsuccessful somehow, or some nodes weren't + // actually merged (source-only nodes?) we request the bucket info of the + // bucket again to make sure it's ok. + for (uint32_t i = 0; i < reply->getNodes().size(); i++) { + sendRequestBucketInfo(reply->getNodes()[i].index, + reply->getBucket(), + replyGuard); + } + + return true; +} + +void +StripeBucketDBUpdater::enqueueRecheckUntilPendingStateEnabled( + uint16_t node, + const document::Bucket& bucket) +{ + LOG(spam, + "DB updater has a pending cluster state, enqueuing recheck " + "of bucket %s on node %u until state is done processing", + bucket.toString().c_str(), + node); + _enqueuedRechecks.insert(EnqueuedBucketRecheck(node, bucket)); +} + +void +StripeBucketDBUpdater::sendAllQueuedBucketRechecks() +{ + LOG(spam, + "Sending %zu queued bucket rechecks previously received " + "via NotifyBucketChange commands", + _enqueuedRechecks.size()); + + for (const auto & entry :_enqueuedRechecks) { + sendRequestBucketInfo(entry.node, entry.bucket, std::shared_ptr<MergeReplyGuard>()); + } + _enqueuedRechecks.clear(); +} + +bool +StripeBucketDBUpdater::onNotifyBucketChange( + const std::shared_ptr<api::NotifyBucketChangeCommand>& cmd) +{ + // Immediately schedule reply to ensure it is sent. + _sender.sendReply(std::make_shared<api::NotifyBucketChangeReply>(*cmd)); + + if (!cmd->getBucketInfo().valid()) { + LOG(error, + "Received invalid bucket info for bucket %s from notify bucket " + "change! Not updating bucket.", + cmd->getBucketId().toString().c_str()); + return true; + } + LOG(debug, + "Received notify bucket change from node %u for bucket %s with %s.", + cmd->getSourceIndex(), + cmd->getBucketId().toString().c_str(), + cmd->getBucketInfo().toString().c_str()); + + if (hasPendingClusterState()) { + enqueueRecheckUntilPendingStateEnabled(cmd->getSourceIndex(), + cmd->getBucket()); + } else { + sendRequestBucketInfo(cmd->getSourceIndex(), + cmd->getBucket(), + std::shared_ptr<MergeReplyGuard>()); + } + + return true; +} + +namespace { + +bool sort_pred(const BucketListMerger::BucketEntry& left, + const BucketListMerger::BucketEntry& right) { + return left.first < right.first; +} + +} + +bool +StripeBucketDBUpdater::onRequestBucketInfoReply( + const std::shared_ptr<api::RequestBucketInfoReply> & repl) +{ + if (pendingClusterStateAccepted(repl)) { + return true; + } + return processSingleBucketInfoReply(repl); +} + +bool +StripeBucketDBUpdater::pendingClusterStateAccepted( + const std::shared_ptr<api::RequestBucketInfoReply> & repl) +{ + if (_pendingClusterState.get() + && _pendingClusterState->onRequestBucketInfoReply(repl)) + { + if (isPendingClusterStateCompleted()) { + processCompletedPendingClusterState(); + } + return true; + } + LOG(spam, + "Reply %s was not accepted by pending cluster state", + repl->toString().c_str()); + return false; +} + +void +StripeBucketDBUpdater::handleSingleBucketInfoFailure( + const std::shared_ptr<api::RequestBucketInfoReply>& repl, + const BucketRequest& req) +{ + LOG(debug, "Request bucket info failed towards node %d: error was %s", + req.targetNode, repl->getResult().toString().c_str()); + + if (req.bucket.getBucketId() != document::BucketId(0)) { + framework::MilliSecTime sendTime(_node_ctx.clock()); + sendTime += framework::MilliSecTime(100); + _delayedRequests.emplace_back(sendTime, req); + } +} + +void +StripeBucketDBUpdater::resendDelayedMessages() +{ + if (_pendingClusterState) { + _pendingClusterState->resendDelayedMessages(); + } + if (_delayedRequests.empty()) { + return; // Don't fetch time if not needed + } + framework::MilliSecTime currentTime(_node_ctx.clock()); + while (!_delayedRequests.empty() + && currentTime >= _delayedRequests.front().first) + { + BucketRequest& req(_delayedRequests.front().second); + sendRequestBucketInfo(req.targetNode, req.bucket, std::shared_ptr<MergeReplyGuard>()); + _delayedRequests.pop_front(); + } +} + +void +StripeBucketDBUpdater::convertBucketInfoToBucketList( + const std::shared_ptr<api::RequestBucketInfoReply>& repl, + uint16_t targetNode, BucketListMerger::BucketList& newList) +{ + for (const auto & entry : repl->getBucketInfo()) { + LOG(debug, "Received bucket information from node %u for bucket %s: %s", targetNode, + entry._bucketId.toString().c_str(), entry._info.toString().c_str()); + + newList.emplace_back(entry._bucketId, entry._info); + } +} + +void +StripeBucketDBUpdater::mergeBucketInfoWithDatabase( + const std::shared_ptr<api::RequestBucketInfoReply>& repl, + const BucketRequest& req) +{ + BucketListMerger::BucketList existing; + BucketListMerger::BucketList newList; + + findRelatedBucketsInDatabase(req.targetNode, req.bucket, existing); + convertBucketInfoToBucketList(repl, req.targetNode, newList); + + std::sort(existing.begin(), existing.end(), sort_pred); + std::sort(newList.begin(), newList.end(), sort_pred); + + BucketListMerger merger(newList, existing, req.timestamp); + updateDatabase(req.bucket.getBucketSpace(), req.targetNode, merger); +} + +bool +StripeBucketDBUpdater::processSingleBucketInfoReply( + const std::shared_ptr<api::RequestBucketInfoReply> & repl) +{ + auto iter = _sentMessages.find(repl->getMsgId()); + + // Has probably been deleted for some reason earlier. + if (iter == _sentMessages.end()) { + return true; + } + + BucketRequest req = iter->second; + _sentMessages.erase(iter); + + if (!_op_ctx.storage_node_is_up(req.bucket.getBucketSpace(), req.targetNode)) { + // Ignore replies from nodes that are down. + return true; + } + if (repl->getResult().getResult() != api::ReturnCode::OK) { + handleSingleBucketInfoFailure(repl, req); + return true; + } + mergeBucketInfoWithDatabase(repl, req); + return true; +} + +void +StripeBucketDBUpdater::addBucketInfoForNode( + const BucketDatabase::Entry& e, + uint16_t node, + BucketListMerger::BucketList& existing) const +{ + const BucketCopy* copy(e->getNode(node)); + if (copy) { + existing.emplace_back(e.getBucketId(), copy->getBucketInfo()); + } +} + +void +StripeBucketDBUpdater::findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket, + BucketListMerger::BucketList& existing) +{ + auto &distributorBucketSpace(_op_ctx.bucket_space_repo().get(bucket.getBucketSpace())); + std::vector<BucketDatabase::Entry> entries; + distributorBucketSpace.getBucketDatabase().getAll(bucket.getBucketId(), entries); + + for (const BucketDatabase::Entry & entry : entries) { + addBucketInfoForNode(entry, node, existing); + } +} + +void +StripeBucketDBUpdater::updateDatabase(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger) +{ + for (const document::BucketId & bucketId : merger.getRemovedEntries()) { + document::Bucket bucket(bucketSpace, bucketId); + _op_ctx.remove_node_from_bucket_database(bucket, node); + } + + for (const BucketListMerger::BucketEntry& entry : merger.getAddedEntries()) { + document::Bucket bucket(bucketSpace, entry.first); + _op_ctx.update_bucket_database( + bucket, + BucketCopy(merger.getTimestamp(), node, entry.second), + DatabaseUpdate::CREATE_IF_NONEXISTING); + } +} + +bool +StripeBucketDBUpdater::isPendingClusterStateCompleted() const +{ + return _pendingClusterState.get() && _pendingClusterState->done(); +} + +void +StripeBucketDBUpdater::processCompletedPendingClusterState() +{ + if (_pendingClusterState->isDeferred()) { + LOG(debug, "Deferring completion of pending cluster state version %u until explicitly activated", + _pendingClusterState->clusterStateVersion()); + assert(_pendingClusterState->hasCommand()); // Deferred transitions should only ever be created by state commands. + // Sending down SetSystemState command will reach the state manager and a reply + // will be auto-sent back to the cluster controller in charge. Once this happens, + // it will send an explicit activation command once all distributors have reported + // that their pending cluster states have completed. + // A booting distributor will treat itself as "system Up" before the state has actually + // taken effect via activation. External operation handler will keep operations from + // actually being scheduled until state has been activated. The external operation handler + // needs to be explicitly aware of the case where no state has yet to be activated. + _distributor_interface.getMessageSender().sendDown( + _pendingClusterState->getCommand()); + _pendingClusterState->clearCommand(); + return; + } + // Distribution config change or non-deferred cluster state. Immediately activate + // the pending state without being told to do so explicitly. + activatePendingClusterState(); +} + +void +StripeBucketDBUpdater::activatePendingClusterState() +{ + framework::MilliSecTimer process_timer(_node_ctx.clock()); + + _pendingClusterState->mergeIntoBucketDatabases(); + maybe_inject_simulated_db_merging_delay(); + + if (_pendingClusterState->isVersionedTransition()) { + LOG(debug, "Activating pending cluster state version %u", _pendingClusterState->clusterStateVersion()); + enableCurrentClusterStateBundleInDistributor(); + if (_pendingClusterState->hasCommand()) { + _distributor_interface.getMessageSender().sendDown( + _pendingClusterState->getCommand()); + } + addCurrentStateToClusterStateHistory(); + } else { + LOG(debug, "Activating pending distribution config"); + // TODO distribution changes cannot currently be deferred as they are not + // initiated by the cluster controller! + _distributor_interface.notifyDistributionChangeEnabled(); + } + + update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle()); + _pendingClusterState.reset(); + _outdatedNodesMap.clear(); + _op_ctx.bucket_space_repo().clear_pending_cluster_state_bundle(); // TODO also read only bucket space..? + sendAllQueuedBucketRechecks(); + completeTransitionTimer(); + clearReadOnlyBucketRepoDatabases(); + + _distributor_interface.getMetrics().activate_cluster_state_processing_time.addValue( + process_timer.getElapsedTimeAsDouble()); +} + +void +StripeBucketDBUpdater::enableCurrentClusterStateBundleInDistributor() +{ + const lib::ClusterStateBundle& state( + _pendingClusterState->getNewClusterStateBundle()); + + LOG(debug, + "BucketDBUpdater finished processing state %s", + state.getBaselineClusterState()->toString().c_str()); + + _distributor_interface.enableClusterStateBundle(state); +} + +void StripeBucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state) { + update_read_snapshot_after_activation(activated_state); + _distributor_interface.enableClusterStateBundle(activated_state); +} + +void +StripeBucketDBUpdater::addCurrentStateToClusterStateHistory() +{ + _history.push_back(_pendingClusterState->getSummary()); + + if (_history.size() > 50) { + _history.pop_front(); + } +} + +vespalib::string +StripeBucketDBUpdater::getReportContentType(const framework::HttpUrlPath&) const +{ + return "text/xml"; +} + +namespace { + +const vespalib::string ALL = "all"; +const vespalib::string BUCKETDB = "bucketdb"; +const vespalib::string BUCKETDB_UPDATER = "Bucket Database Updater"; + +} + +void +StripeBucketDBUpdater::BucketRequest::print_xml_tag(vespalib::xml::XmlOutputStream &xos, const vespalib::xml::XmlAttribute ×tampAttribute) const +{ + using namespace vespalib::xml; + xos << XmlTag("storagenode") + << XmlAttribute("index", targetNode); + xos << XmlAttribute("bucketspace", bucket.getBucketSpace().getId(), XmlAttribute::HEX); + if (bucket.getBucketId().getRawId() == 0) { + xos << XmlAttribute("bucket", ALL); + } else { + xos << XmlAttribute("bucket", bucket.getBucketId().getId(), XmlAttribute::HEX); + } + xos << timestampAttribute << XmlEndTag(); +} + +bool +StripeBucketDBUpdater::reportStatus(std::ostream& out, + const framework::HttpUrlPath& path) const +{ + using namespace vespalib::xml; + XmlOutputStream xos(out); + // FIXME(vekterli): have to do this manually since we cannot inherit + // directly from XmlStatusReporter due to data races when BucketDBUpdater + // gets status requests directly. + xos << XmlTag("status") + << XmlAttribute("id", BUCKETDB) + << XmlAttribute("name", BUCKETDB_UPDATER); + reportXmlStatus(xos, path); + xos << XmlEndTag(); + return true; +} + +vespalib::string +StripeBucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos, + const framework::HttpUrlPath&) const +{ + using namespace vespalib::xml; + xos << XmlTag("bucketdb") + << XmlTag("systemstate_active") + << XmlContent(_op_ctx.cluster_state_bundle().getBaselineClusterState()->toString()) + << XmlEndTag(); + if (_pendingClusterState) { + xos << *_pendingClusterState; + } + xos << XmlTag("systemstate_history"); + for (auto i(_history.rbegin()), e(_history.rend()); i != e; ++i) { + xos << XmlTag("change") + << XmlAttribute("from", i->_prevClusterState) + << XmlAttribute("to", i->_newClusterState) + << XmlAttribute("processingtime", i->_processingTime) + << XmlEndTag(); + } + xos << XmlEndTag() + << XmlTag("single_bucket_requests"); + for (const auto & entry : _sentMessages) + { + entry.second.print_xml_tag(xos, XmlAttribute("sendtimestamp", entry.second.timestamp)); + } + xos << XmlEndTag() + << XmlTag("delayed_single_bucket_requests"); + for (const auto & entry : _delayedRequests) + { + entry.second.print_xml_tag(xos, XmlAttribute("resendtimestamp", entry.first.getTime())); + } + xos << XmlEndTag() << XmlEndTag(); + return ""; +} + +StripeBucketDBUpdater::MergingNodeRemover::MergingNodeRemover( + const lib::ClusterState& oldState, + const lib::ClusterState& s, + uint16_t localIndex, + const lib::Distribution& distribution, + const char* upStates, + bool track_non_owned_entries) + : _oldState(oldState), + _state(s), + _available_nodes(), + _nonOwnedBuckets(), + _removed_buckets(0), + _removed_documents(0), + _localIndex(localIndex), + _distribution(distribution), + _upStates(upStates), + _track_non_owned_entries(track_non_owned_entries), + _cachedDecisionSuperbucket(UINT64_MAX), + _cachedOwned(false) +{ + // TODO intersection of cluster state and distribution config + const uint16_t storage_count = s.getNodeCount(lib::NodeType::STORAGE); + _available_nodes.resize(storage_count); + for (uint16_t i = 0; i < storage_count; ++i) { + if (s.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState().oneOf(_upStates)) { + _available_nodes[i] = true; + } + } +} + +void +StripeBucketDBUpdater::MergingNodeRemover::logRemove(const document::BucketId& bucketId, const char* msg) const +{ + LOG(spam, "Removing bucket %s: %s", bucketId.toString().c_str(), msg); +} + +namespace { + +uint64_t superbucket_from_id(const document::BucketId& id, uint16_t distribution_bits) noexcept { + // The n LSBs of the bucket ID contain the superbucket number. Mask off the rest. + return id.getRawId() & ~(UINT64_MAX << distribution_bits); +} + +} + +bool +StripeBucketDBUpdater::MergingNodeRemover::distributorOwnsBucket( + const document::BucketId& bucketId) const +{ + // TODO "no distributors available" case is the same for _all_ buckets; cache once in constructor. + // TODO "too few bits used" case can be cheaply checked without needing exception + try { + const auto bits = _state.getDistributionBitCount(); + const auto this_superbucket = superbucket_from_id(bucketId, bits); + if (_cachedDecisionSuperbucket == this_superbucket) { + if (!_cachedOwned) { + logRemove(bucketId, "bucket now owned by another distributor (cached)"); + } + return _cachedOwned; + } + + uint16_t distributor = _distribution.getIdealDistributorNode(_state, bucketId, "uim"); + _cachedDecisionSuperbucket = this_superbucket; + _cachedOwned = (distributor == _localIndex); + if (!_cachedOwned) { + logRemove(bucketId, "bucket now owned by another distributor"); + return false; + } + return true; + } catch (lib::TooFewBucketBitsInUseException& exc) { + logRemove(bucketId, "using too few distribution bits now"); + } catch (lib::NoDistributorsAvailableException& exc) { + logRemove(bucketId, "no distributors are available"); + } + return false; +} + +void +StripeBucketDBUpdater::MergingNodeRemover::setCopiesInEntry( + BucketDatabase::Entry& e, + const std::vector<BucketCopy>& copies) const +{ + e->clear(); + + std::vector<uint16_t> order = + _distribution.getIdealStorageNodes(_state, e.getBucketId(), _upStates); + + e->addNodes(copies, order); + + LOG(spam, "Changed %s", e->toString().c_str()); +} + +bool +StripeBucketDBUpdater::MergingNodeRemover::has_unavailable_nodes(const storage::BucketDatabase::Entry& e) const +{ + const uint16_t n_nodes = e->getNodeCount(); + for (uint16_t i = 0; i < n_nodes; i++) { + const uint16_t node_idx = e->getNodeRef(i).getNode(); + if (!storage_node_is_available(node_idx)) { + return true; + } + } + return false; +} + +BucketDatabase::MergingProcessor::Result +StripeBucketDBUpdater::MergingNodeRemover::merge(storage::BucketDatabase::Merger& merger) +{ + document::BucketId bucketId(merger.bucket_id()); + LOG(spam, "Check for remove: bucket %s", bucketId.toString().c_str()); + if (!distributorOwnsBucket(bucketId)) { + // TODO remove in favor of DB snapshotting + if (_track_non_owned_entries) { + _nonOwnedBuckets.emplace_back(merger.current_entry()); + } + return Result::Skip; + } + auto& e = merger.current_entry(); + + if (e->getNodeCount() == 0) { // TODO when should this edge ever trigger? + return Result::Skip; + } + + if (!has_unavailable_nodes(e)) { + return Result::KeepUnchanged; + } + + std::vector<BucketCopy> remainingCopies; + for (uint16_t i = 0; i < e->getNodeCount(); i++) { + const uint16_t node_idx = e->getNodeRef(i).getNode(); + if (storage_node_is_available(node_idx)) { + remainingCopies.push_back(e->getNodeRef(i)); + } + } + + if (remainingCopies.empty()) { + ++_removed_buckets; + _removed_documents += e->getHighestDocumentCount(); + return Result::Skip; + } else { + setCopiesInEntry(e, remainingCopies); + return Result::Update; + } +} + +bool +StripeBucketDBUpdater::MergingNodeRemover::storage_node_is_available(uint16_t index) const noexcept +{ + return ((index < _available_nodes.size()) && _available_nodes[index]); +} + +StripeBucketDBUpdater::MergingNodeRemover::~MergingNodeRemover() +{ + if (_removed_buckets != 0) { + LOGBM(info, "After cluster state change %s, %zu buckets no longer " + "have available replicas. %zu documents in these buckets will " + "be unavailable until nodes come back up", + _oldState.getTextualDifference(_state).c_str(), + _removed_buckets, _removed_documents); + } +} + +} // distributor diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h new file mode 100644 index 00000000000..e34d28a69bc --- /dev/null +++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h @@ -0,0 +1,284 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "bucketlistmerger.h" +#include "distributor_stripe_component.h" +#include "distributormessagesender.h" +#include "messageguard.h" +#include "operation_routing_snapshot.h" +#include "outdated_nodes_map.h" +#include "pendingclusterstate.h" +#include "potential_data_loss_report.h" +#include <vespa/document/bucket/bucket.h> +#include <vespa/storage/common/storagelink.h> +#include <vespa/storageapi/message/bucket.h> +#include <vespa/storageapi/messageapi/messagehandler.h> +#include <vespa/storageframework/generic/clock/timer.h> +#include <vespa/storageframework/generic/status/statusreporter.h> +#include <vespa/vdslib/state/clusterstate.h> +#include <atomic> +#include <list> +#include <mutex> + +namespace vespalib::xml { +class XmlOutputStream; +class XmlAttribute; +} + +namespace storage::distributor { + +class DistributorStripeInterface; +class BucketSpaceDistributionContext; + +class StripeBucketDBUpdater final + : public framework::StatusReporter, + public api::MessageHandler +{ +public: + using OutdatedNodesMap = dbtransition::OutdatedNodesMap; + StripeBucketDBUpdater(DistributorStripeInterface& owner, + DistributorBucketSpaceRepo& bucketSpaceRepo, + DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, + DistributorMessageSender& sender, + DistributorComponentRegister& compReg); + ~StripeBucketDBUpdater() override; + + void flush(); + const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const; + void recheckBucketInfo(uint32_t nodeIdx, const document::Bucket& bucket); + + bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override; + bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) override; + bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply> & repl) override; + bool onMergeBucketReply(const std::shared_ptr<api::MergeBucketReply>& reply) override; + bool onNotifyBucketChange(const std::shared_ptr<api::NotifyBucketChangeCommand>&) override; + void resendDelayedMessages(); + void storageDistributionChanged(); + + vespalib::string reportXmlStatus(vespalib::xml::XmlOutputStream&, const framework::HttpUrlPath&) const; + vespalib::string getReportContentType(const framework::HttpUrlPath&) const override; + bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override; + void print(std::ostream& out, bool verbose, const std::string& indent) const; + const DistributorNodeContext& node_context() const { return _node_ctx; } + DistributorOperationContext& operation_context() { return _op_ctx; } + + /** + * Returns whether the current PendingClusterState indicates that there has + * been a transfer of bucket ownership amongst the distributors in the + * cluster. This method only makes sense to call when _pending_cluster_state + * is active, such as from within a enableClusterState() call. + */ + bool bucketOwnershipHasChanged() const { + return ((_pendingClusterState.get() != nullptr) + && _pendingClusterState->hasBucketOwnershipTransfer()); + } + void set_stale_reads_enabled(bool enabled) noexcept { + _stale_reads_enabled.store(enabled, std::memory_order_relaxed); + } + bool stale_reads_enabled() const noexcept { + return _stale_reads_enabled.load(std::memory_order_relaxed); + } + + OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const; +private: + class MergeReplyGuard { + public: + MergeReplyGuard(DistributorStripeInterface& distributor_interface, const std::shared_ptr<api::MergeBucketReply>& reply) noexcept + : _distributor_interface(distributor_interface), _reply(reply) {} + + ~MergeReplyGuard(); + + // Used when we're flushing and simply want to drop the reply rather + // than send it down + void resetReply() { _reply.reset(); } + private: + DistributorStripeInterface& _distributor_interface; + std::shared_ptr<api::MergeBucketReply> _reply; + }; + + struct BucketRequest { + BucketRequest() + : targetNode(0), bucket(), timestamp(0) {}; + + BucketRequest(uint16_t t, uint64_t currentTime, const document::Bucket& b, + const std::shared_ptr<MergeReplyGuard>& guard) + : targetNode(t), + bucket(b), + timestamp(currentTime), + _mergeReplyGuard(guard) {}; + + void print_xml_tag(vespalib::xml::XmlOutputStream &xos, const vespalib::xml::XmlAttribute ×tampAttribute) const; + uint16_t targetNode; + document::Bucket bucket; + uint64_t timestamp; + + std::shared_ptr<MergeReplyGuard> _mergeReplyGuard; + }; + + struct EnqueuedBucketRecheck { + uint16_t node; + document::Bucket bucket; + + EnqueuedBucketRecheck() : node(0), bucket() {} + + EnqueuedBucketRecheck(uint16_t _node, const document::Bucket& _bucket) + : node(_node), + bucket(_bucket) + {} + + bool operator<(const EnqueuedBucketRecheck& o) const { + if (node != o.node) { + return node < o.node; + } + return bucket < o.bucket; + } + bool operator==(const EnqueuedBucketRecheck& o) const { + return node == o.node && bucket == o.bucket; + } + }; + + friend class DistributorTestUtil; + // TODO refactor and rewire to avoid needing this direct meddling + friend class LegacySingleStripeAccessGuard; + // Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor + // components agree on the currently active cluster state bundle. + // Transitively invokes Distributor::enableClusterStateBundle + void simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state); + + bool shouldDeferStateEnabling() const noexcept; + bool hasPendingClusterState() const; + bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl); + bool processSingleBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply>& repl); + void handleSingleBucketInfoFailure(const std::shared_ptr<api::RequestBucketInfoReply>& repl, + const BucketRequest& req); + bool isPendingClusterStateCompleted() const; + void processCompletedPendingClusterState(); + void activatePendingClusterState(); + void mergeBucketInfoWithDatabase(const std::shared_ptr<api::RequestBucketInfoReply>& repl, + const BucketRequest& req); + void convertBucketInfoToBucketList(const std::shared_ptr<api::RequestBucketInfoReply>& repl, + uint16_t targetNode, BucketListMerger::BucketList& newList); + void sendRequestBucketInfo(uint16_t node, const document::Bucket& bucket, + const std::shared_ptr<MergeReplyGuard>& mergeReply); + void addBucketInfoForNode(const BucketDatabase::Entry& e, uint16_t node, + BucketListMerger::BucketList& existing) const; + void ensureTransitionTimerStarted(); + void completeTransitionTimer(); + void clearReadOnlyBucketRepoDatabases(); + /** + * Adds all buckets contained in the bucket database + * that are either contained + * in bucketId, or that bucketId is contained in, that have copies + * on the given node. + */ + void findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket, + BucketListMerger::BucketList& existing); + + /** + Updates the bucket database from the information generated by the given + bucket list merger. + */ + void updateDatabase(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger); + + void updateState(const lib::ClusterState& oldState, const lib::ClusterState& newState); + + void update_read_snapshot_before_db_pruning(); + void removeSuperfluousBuckets(const lib::ClusterStateBundle& newState, + bool is_distribution_config_change); + void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state); + void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state); + + // TODO STRIPE only called when stripe guard is held + PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space, + const lib::ClusterState& new_state, + bool is_distribution_change); + void merge_entries_into_db(document::BucketSpace bucket_space, + api::Timestamp gathered_at_timestamp, + const lib::Distribution& distribution, + const lib::ClusterState& new_state, + const char* storage_up_states, + const std::unordered_set<uint16_t>& outdated_nodes, + const std::vector<dbtransition::Entry>& entries); + + void replyToPreviousPendingClusterStateIfAny(); + void replyToActivationWithActualVersion( + const api::ActivateClusterStateVersionCommand& cmd, + uint32_t actualVersion); + + void enableCurrentClusterStateBundleInDistributor(); + void addCurrentStateToClusterStateHistory(); + void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::Bucket&); + void sendAllQueuedBucketRechecks(); + + void maybe_inject_simulated_db_pruning_delay(); + void maybe_inject_simulated_db_merging_delay(); + + /** + Removes all copies of buckets that are on nodes that are down. + */ + class MergingNodeRemover : public BucketDatabase::MergingProcessor { + public: + MergingNodeRemover(const lib::ClusterState& oldState, + const lib::ClusterState& s, + uint16_t localIndex, + const lib::Distribution& distribution, + const char* upStates, + bool track_non_owned_entries); + ~MergingNodeRemover() override; + + Result merge(BucketDatabase::Merger&) override; + void logRemove(const document::BucketId& bucketId, const char* msg) const; + bool distributorOwnsBucket(const document::BucketId&) const; + + const std::vector<BucketDatabase::Entry>& getNonOwnedEntries() const noexcept { + return _nonOwnedBuckets; + } + size_t removed_buckets() const noexcept { return _removed_buckets; } + size_t removed_documents() const noexcept { return _removed_documents; } + private: + void setCopiesInEntry(BucketDatabase::Entry& e, const std::vector<BucketCopy>& copies) const; + + bool has_unavailable_nodes(const BucketDatabase::Entry&) const; + bool storage_node_is_available(uint16_t index) const noexcept; + + const lib::ClusterState _oldState; + const lib::ClusterState _state; + std::vector<bool> _available_nodes; + std::vector<BucketDatabase::Entry> _nonOwnedBuckets; + size_t _removed_buckets; + size_t _removed_documents; + + uint16_t _localIndex; + const lib::Distribution& _distribution; + const char* _upStates; + bool _track_non_owned_entries; + + mutable uint64_t _cachedDecisionSuperbucket; + mutable bool _cachedOwned; + }; + + DistributorStripeComponent _distributorComponent; + const DistributorNodeContext& _node_ctx; + DistributorOperationContext& _op_ctx; + DistributorStripeInterface& _distributor_interface; + std::deque<std::pair<framework::MilliSecTime, BucketRequest> > _delayedRequests; + std::map<uint64_t, BucketRequest> _sentMessages; + std::unique_ptr<PendingClusterState> _pendingClusterState; + std::list<PendingClusterState::Summary> _history; + DistributorMessageSender& _sender; + std::set<EnqueuedBucketRecheck> _enqueuedRechecks; + OutdatedNodesMap _outdatedNodesMap; + framework::MilliSecTimer _transitionTimer; + std::atomic<bool> _stale_reads_enabled; + using DistributionContexts = std::unordered_map<document::BucketSpace, + std::shared_ptr<BucketSpaceDistributionContext>, + document::BucketSpace::hash>; + DistributionContexts _active_distribution_contexts; + using DbGuards = std::unordered_map<document::BucketSpace, + std::shared_ptr<BucketDatabase::ReadGuard>, + document::BucketSpace::hash>; + DbGuards _explicit_transition_read_guard; + mutable std::mutex _distribution_context_mutex; +}; + +} diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp index fbbc7c366fd..367a325939e 100644 --- a/storage/src/vespa/storage/storageserver/distributornode.cpp +++ b/storage/src/vespa/storage/storageserver/distributornode.cpp @@ -107,6 +107,7 @@ DistributorNode::createChain(IStorageChainBuilder &builder) builder.add(std::move(stateManager)); } +// FIXME STRIPE not thread safe!! api::Timestamp DistributorNode::getUniqueTimestamp() { |