summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-04-19 13:27:12 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2021-04-21 14:38:42 +0000
commite0d7b4ae709383ff005b7b6deef5cd3c79aabdb6 (patch)
treebd0bded76c6ef4444aad99f5c0b6a9fa0f01ba1c /storage
parentdc0bca9a4329ac6d4a000cf410e5b784f45bb2e5 (diff)
Initial implementation and wiring of cross-stripe state and DB handling
Introduces the concept of stripe access guards, that ensure safe and non-concurrent access to the underlying state of all running distributor stripes. Also bring back a top-level `BucketDBUpdater` component responsible for managing cluster state/distribution config and all related bucket info fetching for the entire node as a whole. This component abstracts away all stripe-specific operations via the new guard interface. For now, only a single stripe can be used via the new code path, and by default the legacy code path (single stripe acts as an entire distirbutor) is used. New path may be enabled via (non-live) config, but is not yet production ready.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp2
-rw-r--r--storage/src/tests/distributor/distributortest.cpp39
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp4
-rw-r--r--storage/src/tests/distributor/distributortestutil.h4
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp3
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h4
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt3
-rw-r--r--storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp17
-rw-r--r--storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h27
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp768
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h172
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp84
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h13
-rw-r--r--storage/src/vespa/storage/distributor/distributor_bucket_space.h1
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp17
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h17
-rw-r--r--storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp88
-rw-r--r--storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h67
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp68
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h109
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp14
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.h3
-rw-r--r--storage/src/vespa/storage/distributor/potential_data_loss_report.h22
-rw-r--r--storage/src/vespa/storage/distributor/stripe_access_guard.h68
-rw-r--r--storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp1132
-rw-r--r--storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h284
-rw-r--r--storage/src/vespa/storage/storageserver/distributornode.cpp1
27 files changed, 2281 insertions, 750 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 4ec49b5c6f8..97fccf58901 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -1116,7 +1116,7 @@ TEST_F(BucketDBUpdaterTest, notify_bucket_change_from_node_down) {
getBucketDBUpdater().onNotifyBucketChange(cmd);
}
// Enable here to avoid having request bucket info be silently swallowed
- // (sendRequestBucketInfo drops message if node is down).
+ // (send_request_bucket_info drops message if node is down).
enableDistributorClusterState("distributor:1 storage:2 .0.s:d");
ASSERT_EQ(std::string("BucketId(0x4000000000000001) : "
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 4c574609df5..7958306db5f 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -239,6 +239,7 @@ DistributorTest::DistributorTest()
DistributorTest::~DistributorTest() = default;
+// TODO -> stripe test
TEST_F(DistributorTest, operation_generation) {
setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
@@ -257,6 +258,7 @@ TEST_F(DistributorTest, operation_generation) {
EXPECT_EQ("Visitor Create", testOp(cmd));
}
+// TODO -> stripe test
TEST_F(DistributorTest, operations_generated_and_started_without_duplicates) {
setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
@@ -271,6 +273,8 @@ TEST_F(DistributorTest, operations_generated_and_started_without_duplicates) {
ASSERT_EQ(6, _sender.commands().size());
}
+// TODO -> stripe test
+// TODO also need to impl/test cross-stripe cluster state changes
TEST_F(DistributorTest, recovery_mode_on_cluster_state_change) {
setupDistributor(Redundancy(1), NodeCount(2),
"storage:1 .0.s:d distributor:1");
@@ -291,6 +295,8 @@ TEST_F(DistributorTest, recovery_mode_on_cluster_state_change) {
EXPECT_TRUE(_distributor->isInRecoveryMode());
}
+// TODO -> stripe test
+// TODO how to throttle across stripes?
TEST_F(DistributorTest, operations_are_throttled) {
setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
getConfig().setMinPendingMaintenanceOps(1);
@@ -303,6 +309,7 @@ TEST_F(DistributorTest, operations_are_throttled) {
ASSERT_EQ(1, _sender.commands().size());
}
+// TODO -> stripe test
TEST_F(DistributorTest, handle_unknown_maintenance_reply) {
setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
@@ -322,6 +329,7 @@ TEST_F(DistributorTest, handle_unknown_maintenance_reply) {
}
}
+// TODO -> generic, non distr/stripe test
TEST_F(DistributorTest, contains_time_statement) {
setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
@@ -333,6 +341,7 @@ TEST_F(DistributorTest, contains_time_statement) {
EXPECT_TRUE(getConfig().containsTimeStatement("testdoctype1.headerfield == now() - 3600"));
}
+// TODO -> stripe test
TEST_F(DistributorTest, update_bucket_database) {
enableDistributorClusterState("distributor:1 storage:3");
@@ -402,6 +411,8 @@ public:
}
+// TODO -> stripe test
+// TODO need to impl/test cross-stripe status requests
TEST_F(DistributorTest, tick_processes_status_requests) {
setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
@@ -430,6 +441,8 @@ TEST_F(DistributorTest, tick_processes_status_requests) {
EXPECT_THAT(thread.getResult(), HasSubstr("BucketId(0x4000000000000001)"));
}
+// TODO -> distributor test since it owns metric hook
+// TODO need to impl/test cross-stripe metrics aggregation
TEST_F(DistributorTest, metric_update_hook_updates_pending_maintenance_metrics) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
// To ensure we count all operations, not just those fitting within the
@@ -484,6 +497,7 @@ uint64_t db_sample_interval_sec(const Distributor& d) noexcept {
}
+// TODO -> stripe test
TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_time_intervals) {
getClock().setAbsoluteTimeInSeconds(1000);
@@ -521,6 +535,8 @@ TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_tim
EXPECT_GT(now_used, last_used);
}
+// TODO -> stripe test
+// TODO need to impl/test cross-stripe config propagation
TEST_F(DistributorTest, priority_config_is_propagated_to_distributor_configuration) {
using namespace vespa::config::content::core;
@@ -557,6 +573,7 @@ TEST_F(DistributorTest, priority_config_is_propagated_to_distributor_configurati
EXPECT_EQ(12, static_cast<int>(mp.mergeGlobalBuckets));
}
+// TODO -> stripe test
TEST_F(DistributorTest, no_db_resurrection_for_bucket_not_owned_in_pending_state) {
setupDistributor(Redundancy(1), NodeCount(10), "storage:2 distributor:2");
lib::ClusterState newState("storage:10 distributor:10");
@@ -578,6 +595,7 @@ TEST_F(DistributorTest, no_db_resurrection_for_bucket_not_owned_in_pending_state
EXPECT_EQ("NONEXISTING", dumpBucket(nonOwnedBucket));
}
+// TODO -> stripe test
TEST_F(DistributorTest, added_db_buckets_without_gc_timestamp_implicitly_get_current_time) {
setupDistributor(Redundancy(1), NodeCount(10), "storage:2 distributor:2");
getClock().setAbsoluteTimeInSeconds(101234);
@@ -591,6 +609,7 @@ TEST_F(DistributorTest, added_db_buckets_without_gc_timestamp_implicitly_get_cur
EXPECT_EQ(101234, e->getLastGarbageCollectionTime());
}
+// TODO -> stripe test
TEST_F(DistributorTest, merge_stats_are_accumulated_during_database_iteration) {
setupDistributor(Redundancy(2), NodeCount(3), "storage:3 distributor:1");
// Copies out of sync. Not possible for distributor to _reliably_ tell
@@ -662,6 +681,7 @@ DistributorTest::assertBucketSpaceStats(size_t expBucketPending, size_t expBucke
* their state checkers at all, we won't get any statistics from any other
* operations for the bucket.
*/
+// TODO -> stripe test
TEST_F(DistributorTest, stats_generated_for_preempted_operations) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
// For this test it suffices to have a single bucket with multiple aspects
@@ -686,6 +706,7 @@ TEST_F(DistributorTest, stats_generated_for_preempted_operations) {
}
}
+// TODO -> distributor test
TEST_F(DistributorTest, host_info_reporter_config_is_propagated_to_reporter) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
@@ -699,11 +720,13 @@ TEST_F(DistributorTest, host_info_reporter_config_is_propagated_to_reporter) {
EXPECT_FALSE(distributor_host_info_reporter().isReportingEnabled());
}
+// TODO -> stripe test (though config is a bit of a special case...)
TEST_F(DistributorTest, replica_counting_mode_is_configured_to_trusted_by_default) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
EXPECT_EQ(ConfigBuilder::MinimumReplicaCountingMode::TRUSTED, currentReplicaCountingMode());
}
+// TODO -> stripe test
TEST_F(DistributorTest, replica_counting_mode_config_is_propagated_to_metric_updater) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
ConfigBuilder builder;
@@ -712,6 +735,7 @@ TEST_F(DistributorTest, replica_counting_mode_config_is_propagated_to_metric_upd
EXPECT_EQ(ConfigBuilder::MinimumReplicaCountingMode::ANY, currentReplicaCountingMode());
}
+// TODO -> stripe test
TEST_F(DistributorTest, max_consecutively_inhibited_maintenance_ticks_config_is_propagated_to_internal_config) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
ConfigBuilder builder;
@@ -720,11 +744,13 @@ TEST_F(DistributorTest, max_consecutively_inhibited_maintenance_ticks_config_is_
EXPECT_EQ(getConfig().max_consecutively_inhibited_maintenance_ticks(), 123);
}
+// TODO -> stripe test
TEST_F(DistributorTest, bucket_activation_is_enabled_by_default) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
EXPECT_FALSE(getConfig().isBucketActivationDisabled());
}
+// TODO -> stripe test
TEST_F(DistributorTest, bucket_activation_config_is_propagated_to_distributor_configuration) {
using namespace vespa::config::content::core;
@@ -747,6 +773,7 @@ DistributorTest::configureMaxClusterClockSkew(int seconds) {
_distributor->enableNextConfig();
}
+// TODO -> stripe test
TEST_F(DistributorTest, max_clock_skew_config_is_propagated_to_distributor_config) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
@@ -817,6 +844,7 @@ void DistributorTest::assertNoMessageBounced() {
// TODO refactor this to set proper highest timestamp as part of bucket info
// reply once we have the "highest timestamp across all owned buckets" feature
// in place.
+// TODO where does this truly belong?
TEST_F(DistributorTest, configured_safe_time_point_rejection_works_end_to_end) {
setupDistributor(Redundancy(2), NodeCount(2),
"bits:1 storage:1 distributor:2");
@@ -846,6 +874,7 @@ void DistributorTest::configure_mutation_sequencing(bool enabled) {
_distributor->enableNextConfig();
}
+// TODO -> stripe test
TEST_F(DistributorTest, sequencing_config_is_propagated_to_distributor_config) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
@@ -871,6 +900,7 @@ DistributorTest::configure_merge_busy_inhibit_duration(int seconds) {
_distributor->enableNextConfig();
}
+// TODO -> stripe test
TEST_F(DistributorTest, merge_busy_inhibit_duration_config_is_propagated_to_distributor_config) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
@@ -878,6 +908,7 @@ TEST_F(DistributorTest, merge_busy_inhibit_duration_config_is_propagated_to_dist
EXPECT_EQ(getConfig().getInhibitMergesOnBusyNodeDuration(), std::chrono::seconds(7));
}
+// TODO -> stripe test
TEST_F(DistributorTest, merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:1 distributor:1");
addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t");
@@ -903,6 +934,7 @@ TEST_F(DistributorTest, merge_busy_inhibit_duration_is_propagated_to_pending_mes
EXPECT_FALSE(node_info.isBusy(0));
}
+// TODO -> stripe test
TEST_F(DistributorTest, external_client_requests_are_handled_individually_in_priority_order) {
setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t/a");
@@ -931,6 +963,7 @@ TEST_F(DistributorTest, external_client_requests_are_handled_individually_in_pri
EXPECT_THAT(actual, ContainerEq(expected));
}
+// TODO -> stripe test
TEST_F(DistributorTest, internal_messages_are_started_in_fifo_order_batch) {
// To test internal request ordering, we use NotifyBucketChangeCommand
// for the reason that it explicitly updates the bucket database for
@@ -959,6 +992,8 @@ TEST_F(DistributorTest, internal_messages_are_started_in_fifo_order_batch) {
EXPECT_EQ(api::BucketInfo(1, 1, 1), e.getBucketInfo().getNode(0)->getBucketInfo());
}
+// TODO -> stripe test
+// TODO also test that closing distributor closes stripes
TEST_F(DistributorTest, closing_aborts_priority_queued_client_requests) {
setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
document::BucketId bucket(16, 1);
@@ -997,6 +1032,9 @@ void assert_invalid_stats_for_all_spaces(
}
+// TODO -> stripe test
+// TODO must impl/test cross-stripe bucket space stats
+// TODO cross-stripe recovery mode handling how?
TEST_F(DistributorTest, entering_recovery_mode_resets_bucket_space_stats) {
// Set up a cluster state + DB contents which implies merge maintenance ops
setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2");
@@ -1018,6 +1056,7 @@ TEST_F(DistributorTest, entering_recovery_mode_resets_bucket_space_stats) {
assert_invalid_stats_for_all_spaces(stats, 2);
}
+// TODO figure out interaction between stripes and distributors on this one
TEST_F(DistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_replies) {
setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2");
// Should not send explicit replies during init stage
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index 7929cc1c906..8a0e18c86af 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -66,7 +66,7 @@ DistributorTestUtil::setup_distributor(int redundancy,
// This is for all intents and purposes a hack to avoid having the
// distributor treat setting the distribution explicitly as a signal that
// it should send RequestBucketInfo to all configured nodes.
- // If we called storageDistributionChanged followed by enableDistribution
+ // If we called storage_distribution_changed followed by enableDistribution
// explicitly (which is what happens in "real life"), that is what would
// take place.
// The inverse case of this can be explicitly accomplished by calling
@@ -338,7 +338,7 @@ DistributorTestUtil::disableBucketActivationInConfig(bool disable)
getConfig().configure(config);
}
-BucketDBUpdater&
+StripeBucketDBUpdater&
DistributorTestUtil::getBucketDBUpdater() {
return _distributor->bucket_db_updater();
}
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index d3c0445d5b5..ddf153a1406 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -17,7 +17,7 @@ namespace framework { struct TickingThreadPool; }
namespace distributor {
-class BucketDBUpdater;
+class StripeBucketDBUpdater;
class Distributor;
class DistributorBucketSpace;
class DistributorBucketSpaceRepo;
@@ -112,7 +112,7 @@ public:
int idx = -1,
api::ReturnCode::Result result = api::ReturnCode::OK);
- BucketDBUpdater& getBucketDBUpdater();
+ StripeBucketDBUpdater& getBucketDBUpdater();
IdealStateManager& getIdealStateManager();
ExternalOperationHandler& getExternalOperationHandler();
storage::distributor::DistributorStripeComponent& distributor_component();
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index ea963b227e2..53f2e7c8413 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -31,6 +31,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_maxPendingMaintenanceOps(1000),
_maxVisitorsPerNodePerClientVisitor(4),
_minBucketsPerVisitor(5),
+ _num_distributor_stripes(0),
_maxClusterClockSkew(0),
_inhibitMergeSendingOnBusyNodeDuration(60s),
_simulated_db_pruning_latency(0),
@@ -181,6 +182,8 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
}
_simulated_db_pruning_latency = std::chrono::milliseconds(std::max(0, config.simulatedDbPruningLatencyMsec));
_simulated_db_merging_latency = std::chrono::milliseconds(std::max(0, config.simulatedDbMergingLatencyMsec));
+
+ _num_distributor_stripes = std::max(0, config.numDistributorStripes); // TODO STRIPE test
LOG(debug,
"Distributor now using new configuration parameters. Split limits: %d docs/%d bytes. "
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index 9c1456fa9fd..479298ff082 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -265,6 +265,8 @@ public:
return _enable_revert;
}
+ uint32_t num_distributor_stripes() const noexcept { return _num_distributor_stripes; }
+
bool containsTimeStatement(const std::string& documentSelection) const;
private:
@@ -297,6 +299,8 @@ private:
uint32_t _maxVisitorsPerNodePerClientVisitor;
uint32_t _minBucketsPerVisitor;
+ uint32_t _num_distributor_stripes;
+
MaintenancePriorities _maintenancePriorities;
std::chrono::seconds _maxClusterClockSkew;
std::chrono::seconds _inhibitMergeSendingOnBusyNodeDuration;
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index d82d14831f4..57d6a23c79f 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -4,6 +4,7 @@ vespa_add_library(storage_distributor
activecopy.cpp
blockingoperationstarter.cpp
bucket_db_prune_elision.cpp
+ bucket_space_distribution_configs.cpp
bucket_space_distribution_context.cpp
bucketdbupdater.cpp
bucketgctimecalculator.cpp
@@ -23,6 +24,7 @@ vespa_add_library(storage_distributor
ideal_service_layer_nodes_bundle.cpp
idealstatemanager.cpp
idealstatemetricsset.cpp
+ legacy_single_stripe_accessor.cpp
messagetracker.cpp
nodeinfo.cpp
operation_routing_snapshot.cpp
@@ -40,6 +42,7 @@ vespa_add_library(storage_distributor
statechecker.cpp
statecheckers.cpp
statusreporterdelegate.cpp
+ stripe_bucket_db_updater.cpp
throttlingoperationstarter.cpp
update_metric_set.cpp
visitormetricsset.cpp
diff --git a/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp b/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp
new file mode 100644
index 00000000000..da5769411d4
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp
@@ -0,0 +1,17 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "bucket_space_distribution_configs.h"
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
+#include <vespa/vdslib/distribution/distribution.h>
+
+namespace storage::distributor {
+
+BucketSpaceDistributionConfigs
+BucketSpaceDistributionConfigs::from_default_distribution(std::shared_ptr<const lib::Distribution> distribution) {
+ BucketSpaceDistributionConfigs ret;
+ ret.space_configs.emplace(document::FixedBucketSpaces::global_space(), GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution));
+ ret.space_configs.emplace(document::FixedBucketSpaces::default_space(), std::move(distribution));
+ return ret;
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h b/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h
new file mode 100644
index 00000000000..9ebd8ef9732
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h
@@ -0,0 +1,27 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/document/bucket/bucketspace.h>
+#include <map>
+#include <memory>
+
+namespace storage::lib { class Distribution; }
+
+namespace storage::distributor {
+
+/**
+ * Represents a complete mapping of all known bucket spaces to their appropriate,
+ * (possibly derived) distribution config.
+ */
+struct BucketSpaceDistributionConfigs {
+ std::map<document::BucketSpace, std::shared_ptr<const lib::Distribution>> space_configs;
+
+ std::shared_ptr<const lib::Distribution> get_or_nullptr(document::BucketSpace space) const noexcept {
+ auto iter = space_configs.find(space);
+ return (iter != space_configs.end()) ? iter->second : std::shared_ptr<const lib::Distribution>();
+ }
+
+ static BucketSpaceDistributionConfigs from_default_distribution(std::shared_ptr<const lib::Distribution>);
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 6735ea1e533..6dcb5ba732e 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -2,12 +2,15 @@
#include "bucketdbupdater.h"
#include "bucket_db_prune_elision.h"
+#include "bucket_space_distribution_configs.h"
#include "bucket_space_distribution_context.h"
#include "distributor.h"
#include "distributor_bucket_space.h"
#include "distributormetricsset.h"
#include "simpleclusterinformation.h"
+#include "stripe_access_guard.h"
#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/vdslib/distribution/distribution.h>
@@ -24,74 +27,71 @@ using document::BucketSpace;
namespace storage::distributor {
-BucketDBUpdater::BucketDBUpdater(DistributorStripeInterface& owner,
- DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
+BucketDBUpdater::BucketDBUpdater(DistributorStripeInterface& owner, // FIXME STRIPE!
DistributorMessageSender& sender,
- DistributorComponentRegister& compReg)
- : framework::StatusReporter("bucketdb", "Bucket DB Updater"),
- _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"),
- _node_ctx(_distributorComponent),
- _op_ctx(_distributorComponent),
- _distributor_interface(_distributorComponent.getDistributor()),
- _delayedRequests(),
- _sentMessages(),
- _pendingClusterState(),
+ DistributorComponentRegister& comp_reg,
+ StripeAccessor& stripe_accessor)
+ : framework::StatusReporter("temp_bucketdb", "Bucket DB Updater"), // TODO STRIPE rename once duplication is removed
+ _stripe_accessor(stripe_accessor),
+ _active_state_bundle(lib::ClusterState()),
+ _dummy_mutable_bucket_space_repo(std::make_unique<DistributorBucketSpaceRepo>(owner.getDistributorIndex())),
+ _dummy_read_only_bucket_space_repo(std::make_unique<DistributorBucketSpaceRepo>(owner.getDistributorIndex())),
+ _distributor_component(owner, *_dummy_mutable_bucket_space_repo, *_dummy_read_only_bucket_space_repo, comp_reg, "Bucket DB Updater"),
+ _node_ctx(_distributor_component),
+ _op_ctx(_distributor_component),
+ _distributor_interface(_distributor_component.getDistributor()),
+ _delayed_requests(),
+ _sent_messages(),
+ _pending_cluster_state(),
_history(),
_sender(sender),
- _enqueuedRechecks(),
- _outdatedNodesMap(),
- _transitionTimer(_node_ctx.clock()),
- _stale_reads_enabled(false),
- _active_distribution_contexts(),
- _explicit_transition_read_guard(),
- _distribution_context_mutex()
+ _enqueued_rechecks(),
+ _outdated_nodes_map(),
+ _transition_timer(_node_ctx.clock()),
+ _stale_reads_enabled(false)
{
- for (auto& elem : _op_ctx.bucket_space_repo()) {
- _active_distribution_contexts.emplace(
- elem.first,
- BucketSpaceDistributionContext::make_not_yet_initialized(_node_ctx.node_index()));
- _explicit_transition_read_guard.emplace(elem.first, std::shared_ptr<BucketDatabase::ReadGuard>());
- }
+ // FIXME STRIPE top-level Distributor needs a proper way to track the current cluster state bundle!
+ propagate_active_state_bundle_internally();
+ bootstrap_distribution_config(_distributor_component.getDistribution());
}
BucketDBUpdater::~BucketDBUpdater() = default;
-OperationRoutingSnapshot BucketDBUpdater::read_snapshot_for_bucket(const document::Bucket& bucket) const {
- const auto bucket_space = bucket.getBucketSpace();
- std::lock_guard lock(_distribution_context_mutex);
- auto active_state_iter = _active_distribution_contexts.find(bucket_space);
- assert(active_state_iter != _active_distribution_contexts.cend());
- auto& state = *active_state_iter->second;
- if (!state.bucket_owned_in_active_state(bucket.getBucketId())) {
- return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second);
+void
+BucketDBUpdater::propagate_active_state_bundle_internally() {
+ for (auto* repo : {_dummy_mutable_bucket_space_repo.get(), _dummy_read_only_bucket_space_repo.get()}) {
+ for (auto& iter : *repo) {
+ iter.second->setClusterState(_active_state_bundle.getDerivedClusterState(iter.first));
+ }
}
- const bool bucket_present_in_mutable_db = state.bucket_owned_in_pending_state(bucket.getBucketId());
- if (!bucket_present_in_mutable_db && !stale_reads_enabled()) {
- return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second);
+}
+
+void
+BucketDBUpdater::bootstrap_distribution_config(std::shared_ptr<const lib::Distribution> distribution) {
+ auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution);
+ for (auto* repo : {_dummy_mutable_bucket_space_repo.get(), _dummy_read_only_bucket_space_repo.get()}) {
+ repo->get(document::FixedBucketSpaces::default_space()).setDistribution(distribution);
+ repo->get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr);
}
- const auto& space_repo = bucket_present_in_mutable_db
- ? _op_ctx.bucket_space_repo()
- : _op_ctx.read_only_bucket_space_repo();
- auto existing_guard_iter = _explicit_transition_read_guard.find(bucket_space);
- assert(existing_guard_iter != _explicit_transition_read_guard.cend());
- auto db_guard = existing_guard_iter->second
- ? existing_guard_iter-> second
- : space_repo.get(bucket_space).getBucketDatabase().acquire_read_guard();
- return OperationRoutingSnapshot::make_routable_with_guard(active_state_iter->second, std::move(db_guard), space_repo);
+ // TODO STRIPE do we need to bootstrap the stripes as well here? Or do they do this on their own volition?
+ // ... need to take a guard if so, so can probably not be done at ctor time..?
}
+// TODO STRIPE what to do with merge guards...
+// FIXME what about bucket DB replica update timestamp allocations?! Replace with u64 counter..?
+// Must at the very least ensure we use stripe-local TS generation for DB inserts...! i.e. no global TS
+// Or do we have to touch these at all here? Just defer all this via stripe interface?
void
BucketDBUpdater::flush()
{
- for (auto & entry : _sentMessages) {
+ for (auto & entry : _sent_messages) {
// Cannot sendDown MergeBucketReplies during flushing, since
// all lower links have been closed
if (entry.second._mergeReplyGuard) {
entry.second._mergeReplyGuard->resetReply();
}
}
- _sentMessages.clear();
+ _sent_messages.clear();
}
void
@@ -102,26 +102,19 @@ BucketDBUpdater::print(std::ostream& out, bool verbose, const std::string& inden
}
bool
-BucketDBUpdater::shouldDeferStateEnabling() const noexcept
+BucketDBUpdater::should_defer_state_enabling() const noexcept
{
return stale_reads_enabled();
}
bool
-BucketDBUpdater::hasPendingClusterState() const
+BucketDBUpdater::has_pending_cluster_state() const
{
- return static_cast<bool>(_pendingClusterState);
-}
-
-const lib::ClusterState*
-BucketDBUpdater::pendingClusterStateOrNull(const document::BucketSpace& space) const {
- return (hasPendingClusterState()
- ? _pendingClusterState->getNewClusterStateBundle().getDerivedClusterState(space).get()
- : nullptr);
+ return static_cast<bool>(_pending_cluster_state);
}
void
-BucketDBUpdater::sendRequestBucketInfo(
+BucketDBUpdater::send_request_bucket_info(
uint16_t node,
const document::Bucket& bucket,
const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard)
@@ -135,116 +128,50 @@ BucketDBUpdater::sendRequestBucketInfo(
auto msg = std::make_shared<api::RequestBucketInfoCommand>(bucket.getBucketSpace(), buckets);
- LOG(debug,
- "Sending request bucket info command %" PRIu64 " for "
- "bucket %s to node %u",
- msg->getMsgId(),
- bucket.toString().c_str(),
- node);
+ LOG(debug, "Sending request bucket info command %" PRIu64 " for bucket %s to node %u",
+ msg->getMsgId(), bucket.toString().c_str(), node);
msg->setPriority(50);
msg->setAddress(_node_ctx.node_address(node));
- _sentMessages[msg->getMsgId()] =
+ _sent_messages[msg->getMsgId()] =
BucketRequest(node, _op_ctx.generate_unique_timestamp(),
bucket, mergeReplyGuard);
_sender.sendCommand(msg);
}
void
-BucketDBUpdater::recheckBucketInfo(uint32_t nodeIdx,
- const document::Bucket& bucket)
-{
- sendRequestBucketInfo(nodeIdx, bucket, std::shared_ptr<MergeReplyGuard>());
-}
-
-namespace {
-
-class ReadOnlyDbMergingInserter : public BucketDatabase::MergingProcessor {
- using NewEntries = std::vector<BucketDatabase::Entry>;
- NewEntries::const_iterator _current;
- const NewEntries::const_iterator _last;
-public:
- explicit ReadOnlyDbMergingInserter(const NewEntries& new_entries)
- : _current(new_entries.cbegin()),
- _last(new_entries.cend())
- {}
-
- Result merge(BucketDatabase::Merger& m) override {
- const uint64_t key_to_insert = m.bucket_key();
- uint64_t key_at_cursor = 0;
- while (_current != _last) {
- key_at_cursor = _current->getBucketId().toKey();
- if (key_at_cursor >= key_to_insert) {
- break;
- }
- m.insert_before_current(_current->getBucketId(), *_current);
- ++_current;
- }
- if ((_current != _last) && (key_at_cursor == key_to_insert)) {
- // If we encounter a bucket that already exists, replace value wholesale.
- // Don't try to cleverly merge replicas, as the values we currently hold
- // in the read-only DB may be stale.
- // Note that this case shouldn't really happen, since we only add previously
- // owned buckets to the read-only DB, and subsequent adds to a non-empty DB
- // can only happen for state preemptions. Since ownership is not regained
- // before a state is stable, a bucket is only added once. But we handle it
- // anyway in case this changes at some point in the future.
- m.current_entry() = *_current;
- return Result::Update;
- }
- return Result::KeepUnchanged;
- }
-
- void insert_remaining_at_end(BucketDatabase::TrailingInserter& inserter) override {
- for (; _current != _last; ++_current) {
- inserter.insert_at_end(_current->getBucketId(), *_current);
- }
- }
-};
-
-}
-
-void
-BucketDBUpdater::removeSuperfluousBuckets(
- const lib::ClusterStateBundle& newState,
+BucketDBUpdater::remove_superfluous_buckets(
+ StripeAccessGuard& guard,
+ const lib::ClusterStateBundle& new_state,
bool is_distribution_config_change)
{
- const bool move_to_read_only_db = shouldDeferStateEnabling();
const char* up_states = _op_ctx.storage_node_up_states();
+ // TODO STRIPE explicit space -> config mapping, don't get via repo
+ // ... but we need to get the current cluster state per space..!
for (auto& elem : _op_ctx.bucket_space_repo()) {
- const auto& newDistribution(elem.second->getDistribution());
- const auto& oldClusterState(elem.second->getClusterState());
- const auto& new_cluster_state = newState.getDerivedClusterState(elem.first);
+ const auto& old_cluster_state(elem.second->getClusterState());
+ const auto& new_cluster_state = new_state.getDerivedClusterState(elem.first);
// Running a full DB sweep is expensive, so if the cluster state transition does
// not actually indicate that buckets should possibly be removed, we elide it entirely.
if (!is_distribution_config_change
- && db_pruning_may_be_elided(oldClusterState, *new_cluster_state, up_states))
+ && db_pruning_may_be_elided(old_cluster_state, *new_cluster_state, up_states))
{
LOG(debug, "[bucket space '%s']: eliding DB pruning for state transition '%s' -> '%s'",
document::FixedBucketSpaces::to_string(elem.first).data(),
- oldClusterState.toString().c_str(), new_cluster_state->toString().c_str());
+ old_cluster_state.toString().c_str(), new_cluster_state->toString().c_str());
continue;
}
-
- auto& bucketDb(elem.second->getBucketDatabase());
- auto& readOnlyDb(_op_ctx.read_only_bucket_space_repo().get(elem.first).getBucketDatabase());
-
- // Remove all buckets not belonging to this distributor, or
- // being on storage nodes that are no longer up.
- MergingNodeRemover proc(
- oldClusterState,
- *new_cluster_state,
- _node_ctx.node_index(),
- newDistribution,
- up_states,
- move_to_read_only_db);
-
- bucketDb.merge(proc);
- if (move_to_read_only_db) {
- ReadOnlyDbMergingInserter read_only_merger(proc.getNonOwnedEntries());
- readOnlyDb.merge(read_only_merger);
+ // TODO STRIPE should we also pass old state and distr config? Must ensure we're in sync with stripe...
+ // .. but config is set synchronously via the guard upon pending state creation edge
+ auto maybe_lost = guard.remove_superfluous_buckets(elem.first, *new_cluster_state, is_distribution_config_change);
+ if (maybe_lost.buckets != 0) {
+ LOGBM(info, "After cluster state change %s, %zu buckets no longer "
+ "have available replicas. %zu documents in these buckets will "
+ "be unavailable until nodes come back up",
+ old_cluster_state.getTextualDifference(*new_cluster_state).c_str(),
+ maybe_lost.buckets, maybe_lost.documents);
}
maybe_inject_simulated_db_pruning_delay();
}
@@ -271,63 +198,58 @@ BucketDBUpdater::maybe_inject_simulated_db_merging_delay() {
}
void
-BucketDBUpdater::ensureTransitionTimerStarted()
+BucketDBUpdater::ensure_transition_timer_started()
{
// Don't overwrite start time if we're already processing a state, as
// that will make transition times appear artificially low.
- if (!hasPendingClusterState()) {
- _transitionTimer = framework::MilliSecTimer(
- _node_ctx.clock());
+ if (!has_pending_cluster_state()) {
+ _transition_timer = framework::MilliSecTimer(_node_ctx.clock());
}
}
void
-BucketDBUpdater::completeTransitionTimer()
+BucketDBUpdater::complete_transition_timer()
{
_distributor_interface.getMetrics()
- .stateTransitionTime.addValue(_transitionTimer.getElapsedTimeAsDouble());
+ .stateTransitionTime.addValue(_transition_timer.getElapsedTimeAsDouble());
}
void
-BucketDBUpdater::clearReadOnlyBucketRepoDatabases()
+BucketDBUpdater::storage_distribution_changed(const BucketSpaceDistributionConfigs& configs)
{
- for (auto& space : _op_ctx.read_only_bucket_space_repo()) {
- space.second->getBucketDatabase().clear();
- }
-}
+ ensure_transition_timer_started();
-void
-BucketDBUpdater::storageDistributionChanged()
-{
- ensureTransitionTimerStarted();
-
- removeSuperfluousBuckets(_op_ctx.cluster_state_bundle(), true);
+ auto guard = _stripe_accessor.rendezvous_and_hold_all();
+ // FIXME STRIPE might this cause a mismatch with the component stuff's own distribution config..?!
+ guard->update_distribution_config(configs);
+ remove_superfluous_buckets(*guard, _op_ctx.cluster_state_bundle(), true);
auto clusterInfo = std::make_shared<const SimpleClusterInformation>(
_node_ctx.node_index(),
_op_ctx.cluster_state_bundle(),
_op_ctx.storage_node_up_states());
- _pendingClusterState = PendingClusterState::createForDistributionChange(
+ _pending_cluster_state = PendingClusterState::createForDistributionChange(
_node_ctx.clock(),
std::move(clusterInfo),
_sender,
- _op_ctx.bucket_space_repo(),
- _op_ctx.generate_unique_timestamp());
- _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
- _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
+ _op_ctx.bucket_space_repo(), // TODO STRIPE cannot use!
+ _op_ctx.generate_unique_timestamp()); // TODO STRIPE must ensure no stripes can generate < this
+ _outdated_nodes_map = _pending_cluster_state->getOutdatedNodesMap();
+
+ guard->set_pending_cluster_state_bundle(_pending_cluster_state->getNewClusterStateBundle());
}
void
-BucketDBUpdater::replyToPreviousPendingClusterStateIfAny()
+BucketDBUpdater::reply_to_previous_pending_cluster_state_if_any()
{
- if (_pendingClusterState.get() && _pendingClusterState->hasCommand()) {
+ if (_pending_cluster_state.get() && _pending_cluster_state->hasCommand()) {
_distributor_interface.getMessageSender().sendUp(
- std::make_shared<api::SetSystemStateReply>(*_pendingClusterState->getCommand()));
+ std::make_shared<api::SetSystemStateReply>(*_pending_cluster_state->getCommand()));
}
}
void
-BucketDBUpdater::replyToActivationWithActualVersion(
+BucketDBUpdater::reply_to_activation_with_actual_version(
const api::ActivateClusterStateVersionCommand& cmd,
uint32_t actualVersion)
{
@@ -336,104 +258,49 @@ BucketDBUpdater::replyToActivationWithActualVersion(
_distributor_interface.getMessageSender().sendUp(reply); // TODO let API accept rvalues
}
-void BucketDBUpdater::update_read_snapshot_before_db_pruning() {
- std::lock_guard lock(_distribution_context_mutex);
- for (auto& elem : _op_ctx.bucket_space_repo()) {
- // At this point, we're still operating with a distribution context _without_ a
- // pending state, i.e. anyone using the context will expect to find buckets
- // in the DB that correspond to how the database looked like prior to pruning
- // buckets from the DB. To ensure this is not violated, take a snapshot of the
- // _mutable_ DB and expose this. This snapshot only lives until we atomically
- // flip to expose a distribution context that includes the new, pending state.
- // At that point, the read-only DB is known to contain the buckets that have
- // been pruned away, so we can release the mutable DB snapshot safely.
- // TODO test for, and handle, state preemption case!
- _explicit_transition_read_guard[elem.first] = elem.second->getBucketDatabase().acquire_read_guard();
- }
-}
-
-
-void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) {
- std::lock_guard lock(_distribution_context_mutex);
- const auto old_default_state = _op_ctx.bucket_space_repo().get(
- document::FixedBucketSpaces::default_space()).cluster_state_sp();
- for (auto& elem : _op_ctx.bucket_space_repo()) {
- auto new_distribution = elem.second->distribution_sp();
- auto old_cluster_state = elem.second->cluster_state_sp();
- auto new_cluster_state = new_state.getDerivedClusterState(elem.first);
- _active_distribution_contexts.insert_or_assign(
- elem.first,
- BucketSpaceDistributionContext::make_state_transition(
- std::move(old_cluster_state),
- old_default_state,
- std::move(new_cluster_state),
- std::move(new_distribution),
- _node_ctx.node_index()));
- // We can now remove the explicit mutable DB snapshot, as the buckets that have been
- // pruned away are visible in the read-only DB.
- _explicit_transition_read_guard[elem.first] = std::shared_ptr<BucketDatabase::ReadGuard>();
- }
-}
-
-void BucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) {
- std::lock_guard lock(_distribution_context_mutex);
- const auto& default_cluster_state = activated_state.getDerivedClusterState(document::FixedBucketSpaces::default_space());
- for (auto& elem : _op_ctx.bucket_space_repo()) {
- auto new_distribution = elem.second->distribution_sp();
- auto new_cluster_state = activated_state.getDerivedClusterState(elem.first);
- _active_distribution_contexts.insert_or_assign(
- elem.first,
- BucketSpaceDistributionContext::make_stable_state(
- std::move(new_cluster_state),
- default_cluster_state,
- std::move(new_distribution),
- _node_ctx.node_index()));
- }
-}
-
bool
BucketDBUpdater::onSetSystemState(
const std::shared_ptr<api::SetSystemStateCommand>& cmd)
{
- LOG(debug,
- "Received new cluster state %s",
+ LOG(debug, "Received new cluster state %s",
cmd->getSystemState().toString().c_str());
- const lib::ClusterStateBundle oldState = _op_ctx.cluster_state_bundle();
const lib::ClusterStateBundle& state = cmd->getClusterStateBundle();
- if (state == oldState) {
+ if (state == _active_state_bundle) {
return false;
}
- ensureTransitionTimerStarted();
- // Separate timer since _transitionTimer might span multiple pending states.
+ ensure_transition_timer_started();
+ // Separate timer since _transition_timer might span multiple pending states.
framework::MilliSecTimer process_timer(_node_ctx.clock());
- update_read_snapshot_before_db_pruning();
+
+ auto guard = _stripe_accessor.rendezvous_and_hold_all();
+ guard->update_read_snapshot_before_db_pruning();
const auto& bundle = cmd->getClusterStateBundle();
- removeSuperfluousBuckets(bundle, false);
- update_read_snapshot_after_db_pruning(bundle);
- replyToPreviousPendingClusterStateIfAny();
+ remove_superfluous_buckets(*guard, bundle, false);
+ guard->update_read_snapshot_after_db_pruning(bundle);
+ reply_to_previous_pending_cluster_state_if_any();
auto clusterInfo = std::make_shared<const SimpleClusterInformation>(
_node_ctx.node_index(),
_op_ctx.cluster_state_bundle(),
_op_ctx.storage_node_up_states());
- _pendingClusterState = PendingClusterState::createForClusterStateChange(
+ _pending_cluster_state = PendingClusterState::createForClusterStateChange(
_node_ctx.clock(),
std::move(clusterInfo),
_sender,
- _op_ctx.bucket_space_repo(),
+ _op_ctx.bucket_space_repo(), // TODO STRIPE remove
cmd,
- _outdatedNodesMap,
- _op_ctx.generate_unique_timestamp());
- _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
+ _outdated_nodes_map,
+ _op_ctx.generate_unique_timestamp()); // FIXME STRIPE must be atomic across all threads
+ _outdated_nodes_map = _pending_cluster_state->getOutdatedNodesMap();
_distributor_interface.getMetrics().set_cluster_state_processing_time.addValue(
process_timer.getElapsedTimeAsDouble());
- _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
- if (isPendingClusterStateCompleted()) {
- processCompletedPendingClusterState();
+ guard->set_pending_cluster_state_bundle(_pending_cluster_state->getNewClusterStateBundle());
+ if (is_pending_cluster_state_completed()) {
+ process_completed_pending_cluster_state(*guard);
}
return true;
}
@@ -441,25 +308,26 @@ BucketDBUpdater::onSetSystemState(
bool
BucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd)
{
- if (hasPendingClusterState() && _pendingClusterState->isVersionedTransition()) {
- const auto pending_version = _pendingClusterState->clusterStateVersion();
+ if (has_pending_cluster_state() && _pending_cluster_state->isVersionedTransition()) {
+ const auto pending_version = _pending_cluster_state->clusterStateVersion();
if (pending_version == cmd->version()) {
- if (isPendingClusterStateCompleted()) {
- assert(_pendingClusterState->isDeferred());
- activatePendingClusterState();
+ if (is_pending_cluster_state_completed()) {
+ assert(_pending_cluster_state->isDeferred());
+ auto guard = _stripe_accessor.rendezvous_and_hold_all();
+ activate_pending_cluster_state(*guard);
} else {
LOG(error, "Received cluster state activation for pending version %u "
"without pending state being complete yet. This is not expected, "
"as no activation should be sent before all distributors have "
"reported that state processing is complete.", pending_version);
- replyToActivationWithActualVersion(*cmd, 0); // Invalid version, will cause re-send (hopefully when completed).
+ reply_to_activation_with_actual_version(*cmd, 0); // Invalid version, will cause re-send (hopefully when completed).
return true;
}
} else {
- replyToActivationWithActualVersion(*cmd, pending_version);
+ reply_to_activation_with_actual_version(*cmd, pending_version);
return true;
}
- } else if (shouldDeferStateEnabling()) {
+ } else if (should_defer_state_enabling()) {
// Likely just a resend, but log warn for now to get a feel of how common it is.
LOG(warning, "Received cluster state activation command for version %u, which "
"has no corresponding pending state. Likely resent operation.", cmd->version());
@@ -471,6 +339,7 @@ BucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::Activa
return false;
}
+// TODO remove entirely from this abstraction level?
BucketDBUpdater::MergeReplyGuard::~MergeReplyGuard()
{
if (_reply) {
@@ -488,71 +357,36 @@ BucketDBUpdater::onMergeBucketReply(
// actually merged (source-only nodes?) we request the bucket info of the
// bucket again to make sure it's ok.
for (uint32_t i = 0; i < reply->getNodes().size(); i++) {
- sendRequestBucketInfo(reply->getNodes()[i].index,
- reply->getBucket(),
- replyGuard);
+ send_request_bucket_info(reply->getNodes()[i].index,
+ reply->getBucket(),
+ replyGuard);
}
return true;
}
void
-BucketDBUpdater::enqueueRecheckUntilPendingStateEnabled(
+BucketDBUpdater::enqueue_recheck_until_pending_state_enabled(
uint16_t node,
const document::Bucket& bucket)
{
- LOG(spam,
- "DB updater has a pending cluster state, enqueuing recheck "
- "of bucket %s on node %u until state is done processing",
- bucket.toString().c_str(),
- node);
- _enqueuedRechecks.insert(EnqueuedBucketRecheck(node, bucket));
+ LOG(spam, "DB updater has a pending cluster state, enqueuing recheck "
+ "of bucket %s on node %u until state is done processing",
+ bucket.toString().c_str(), node);
+ _enqueued_rechecks.insert(EnqueuedBucketRecheck(node, bucket));
}
void
-BucketDBUpdater::sendAllQueuedBucketRechecks()
+BucketDBUpdater::send_all_queued_bucket_rechecks()
{
- LOG(spam,
- "Sending %zu queued bucket rechecks previously received "
- "via NotifyBucketChange commands",
- _enqueuedRechecks.size());
+ LOG(spam, "Sending %zu queued bucket rechecks previously received "
+ "via NotifyBucketChange commands",
+ _enqueued_rechecks.size());
- for (const auto & entry :_enqueuedRechecks) {
- sendRequestBucketInfo(entry.node, entry.bucket, std::shared_ptr<MergeReplyGuard>());
+ for (const auto & entry :_enqueued_rechecks) {
+ send_request_bucket_info(entry.node, entry.bucket, std::shared_ptr<MergeReplyGuard>());
}
- _enqueuedRechecks.clear();
-}
-
-bool
-BucketDBUpdater::onNotifyBucketChange(
- const std::shared_ptr<api::NotifyBucketChangeCommand>& cmd)
-{
- // Immediately schedule reply to ensure it is sent.
- _sender.sendReply(std::make_shared<api::NotifyBucketChangeReply>(*cmd));
-
- if (!cmd->getBucketInfo().valid()) {
- LOG(error,
- "Received invalid bucket info for bucket %s from notify bucket "
- "change! Not updating bucket.",
- cmd->getBucketId().toString().c_str());
- return true;
- }
- LOG(debug,
- "Received notify bucket change from node %u for bucket %s with %s.",
- cmd->getSourceIndex(),
- cmd->getBucketId().toString().c_str(),
- cmd->getBucketInfo().toString().c_str());
-
- if (hasPendingClusterState()) {
- enqueueRecheckUntilPendingStateEnabled(cmd->getSourceIndex(),
- cmd->getBucket());
- } else {
- sendRequestBucketInfo(cmd->getSourceIndex(),
- cmd->getBucket(),
- std::shared_ptr<MergeReplyGuard>());
- }
-
- return true;
+ _enqueued_rechecks.clear();
}
bool sort_pred(const BucketListMerger::BucketEntry& left,
@@ -563,34 +397,34 @@ bool sort_pred(const BucketListMerger::BucketEntry& left,
bool
BucketDBUpdater::onRequestBucketInfoReply(
- const std::shared_ptr<api::RequestBucketInfoReply> & repl)
+ const std::shared_ptr<api::RequestBucketInfoReply>& repl)
{
- if (pendingClusterStateAccepted(repl)) {
+ if (pending_cluster_state_accepted(repl)) {
return true;
}
- return processSingleBucketInfoReply(repl);
+ return process_single_bucket_info_reply(repl);
}
bool
-BucketDBUpdater::pendingClusterStateAccepted(
- const std::shared_ptr<api::RequestBucketInfoReply> & repl)
+BucketDBUpdater::pending_cluster_state_accepted(
+ const std::shared_ptr<api::RequestBucketInfoReply>& repl)
{
- if (_pendingClusterState.get()
- && _pendingClusterState->onRequestBucketInfoReply(repl))
+ if (_pending_cluster_state.get()
+ && _pending_cluster_state->onRequestBucketInfoReply(repl))
{
- if (isPendingClusterStateCompleted()) {
- processCompletedPendingClusterState();
+ if (is_pending_cluster_state_completed()) {
+ auto guard = _stripe_accessor.rendezvous_and_hold_all();
+ process_completed_pending_cluster_state(*guard);
}
return true;
}
- LOG(spam,
- "Reply %s was not accepted by pending cluster state",
+ LOG(spam, "Reply %s was not accepted by pending cluster state",
repl->toString().c_str());
return false;
}
void
-BucketDBUpdater::handleSingleBucketInfoFailure(
+BucketDBUpdater::handle_single_bucket_info_failure(
const std::shared_ptr<api::RequestBucketInfoReply>& repl,
const BucketRequest& req)
{
@@ -600,31 +434,31 @@ BucketDBUpdater::handleSingleBucketInfoFailure(
if (req.bucket.getBucketId() != document::BucketId(0)) {
framework::MilliSecTime sendTime(_node_ctx.clock());
sendTime += framework::MilliSecTime(100);
- _delayedRequests.emplace_back(sendTime, req);
+ _delayed_requests.emplace_back(sendTime, req);
}
}
void
-BucketDBUpdater::resendDelayedMessages()
+BucketDBUpdater::resend_delayed_messages()
{
- if (_pendingClusterState) {
- _pendingClusterState->resendDelayedMessages();
+ if (_pending_cluster_state) {
+ _pending_cluster_state->resendDelayedMessages();
}
- if (_delayedRequests.empty()) {
+ if (_delayed_requests.empty()) {
return; // Don't fetch time if not needed
}
framework::MilliSecTime currentTime(_node_ctx.clock());
- while (!_delayedRequests.empty()
- && currentTime >= _delayedRequests.front().first)
+ while (!_delayed_requests.empty()
+ && currentTime >= _delayed_requests.front().first)
{
- BucketRequest& req(_delayedRequests.front().second);
- sendRequestBucketInfo(req.targetNode, req.bucket, std::shared_ptr<MergeReplyGuard>());
- _delayedRequests.pop_front();
+ BucketRequest& req(_delayed_requests.front().second);
+ send_request_bucket_info(req.targetNode, req.bucket, std::shared_ptr<MergeReplyGuard>());
+ _delayed_requests.pop_front();
}
}
void
-BucketDBUpdater::convertBucketInfoToBucketList(
+BucketDBUpdater::convert_bucket_info_to_bucket_list(
const std::shared_ptr<api::RequestBucketInfoReply>& repl,
uint16_t targetNode, BucketListMerger::BucketList& newList)
{
@@ -637,51 +471,51 @@ BucketDBUpdater::convertBucketInfoToBucketList(
}
void
-BucketDBUpdater::mergeBucketInfoWithDatabase(
+BucketDBUpdater::merge_bucket_info_with_database(
const std::shared_ptr<api::RequestBucketInfoReply>& repl,
const BucketRequest& req)
{
BucketListMerger::BucketList existing;
BucketListMerger::BucketList newList;
- findRelatedBucketsInDatabase(req.targetNode, req.bucket, existing);
- convertBucketInfoToBucketList(repl, req.targetNode, newList);
+ find_related_buckets_in_database(req.targetNode, req.bucket, existing);
+ convert_bucket_info_to_bucket_list(repl, req.targetNode, newList);
std::sort(existing.begin(), existing.end(), sort_pred);
std::sort(newList.begin(), newList.end(), sort_pred);
BucketListMerger merger(newList, existing, req.timestamp);
- updateDatabase(req.bucket.getBucketSpace(), req.targetNode, merger);
+ update_database(req.bucket.getBucketSpace(), req.targetNode, merger);
}
bool
-BucketDBUpdater::processSingleBucketInfoReply(
+BucketDBUpdater::process_single_bucket_info_reply(
const std::shared_ptr<api::RequestBucketInfoReply> & repl)
{
- auto iter = _sentMessages.find(repl->getMsgId());
+ auto iter = _sent_messages.find(repl->getMsgId());
// Has probably been deleted for some reason earlier.
- if (iter == _sentMessages.end()) {
+ if (iter == _sent_messages.end()) {
return true;
}
BucketRequest req = iter->second;
- _sentMessages.erase(iter);
+ _sent_messages.erase(iter);
if (!_op_ctx.storage_node_is_up(req.bucket.getBucketSpace(), req.targetNode)) {
// Ignore replies from nodes that are down.
return true;
}
if (repl->getResult().getResult() != api::ReturnCode::OK) {
- handleSingleBucketInfoFailure(repl, req);
+ handle_single_bucket_info_failure(repl, req);
return true;
}
- mergeBucketInfoWithDatabase(repl, req);
+ merge_bucket_info_with_database(repl, req);
return true;
}
void
-BucketDBUpdater::addBucketInfoForNode(
+BucketDBUpdater::add_bucket_info_for_node(
const BucketDatabase::Entry& e,
uint16_t node,
BucketListMerger::BucketList& existing) const
@@ -693,20 +527,20 @@ BucketDBUpdater::addBucketInfoForNode(
}
void
-BucketDBUpdater::findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket,
- BucketListMerger::BucketList& existing)
+BucketDBUpdater::find_related_buckets_in_database(uint16_t node, const document::Bucket& bucket,
+ BucketListMerger::BucketList& existing)
{
auto &distributorBucketSpace(_op_ctx.bucket_space_repo().get(bucket.getBucketSpace()));
std::vector<BucketDatabase::Entry> entries;
distributorBucketSpace.getBucketDatabase().getAll(bucket.getBucketId(), entries);
for (const BucketDatabase::Entry & entry : entries) {
- addBucketInfoForNode(entry, node, existing);
+ add_bucket_info_for_node(entry, node, existing);
}
}
void
-BucketDBUpdater::updateDatabase(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger)
+BucketDBUpdater::update_database(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger)
{
for (const document::BucketId & bucketId : merger.getRemovedEntries()) {
document::Bucket bucket(bucketSpace, bucketId);
@@ -723,18 +557,18 @@ BucketDBUpdater::updateDatabase(document::BucketSpace bucketSpace, uint16_t node
}
bool
-BucketDBUpdater::isPendingClusterStateCompleted() const
+BucketDBUpdater::is_pending_cluster_state_completed() const
{
- return _pendingClusterState.get() && _pendingClusterState->done();
+ return _pending_cluster_state.get() && _pending_cluster_state->done();
}
void
-BucketDBUpdater::processCompletedPendingClusterState()
+BucketDBUpdater::process_completed_pending_cluster_state(StripeAccessGuard& guard)
{
- if (_pendingClusterState->isDeferred()) {
+ if (_pending_cluster_state->isDeferred()) {
LOG(debug, "Deferring completion of pending cluster state version %u until explicitly activated",
- _pendingClusterState->clusterStateVersion());
- assert(_pendingClusterState->hasCommand()); // Deferred transitions should only ever be created by state commands.
+ _pending_cluster_state->clusterStateVersion());
+ assert(_pending_cluster_state->hasCommand()); // Deferred transitions should only ever be created by state commands.
// Sending down SetSystemState command will reach the state manager and a reply
// will be auto-sent back to the cluster controller in charge. Once this happens,
// it will send an explicit activation command once all distributors have reported
@@ -743,73 +577,81 @@ BucketDBUpdater::processCompletedPendingClusterState()
// taken effect via activation. External operation handler will keep operations from
// actually being scheduled until state has been activated. The external operation handler
// needs to be explicitly aware of the case where no state has yet to be activated.
- _distributor_interface.getMessageSender().sendDown(
- _pendingClusterState->getCommand());
- _pendingClusterState->clearCommand();
+ _distributor_interface.getMessageSender().sendDown(_pending_cluster_state->getCommand());
+ _pending_cluster_state->clearCommand();
return;
}
// Distribution config change or non-deferred cluster state. Immediately activate
// the pending state without being told to do so explicitly.
- activatePendingClusterState();
+ activate_pending_cluster_state(guard);
}
void
-BucketDBUpdater::activatePendingClusterState()
+BucketDBUpdater::activate_pending_cluster_state(StripeAccessGuard& guard)
{
framework::MilliSecTimer process_timer(_node_ctx.clock());
- _pendingClusterState->mergeIntoBucketDatabases();
+ _pending_cluster_state->merge_into_bucket_databases(guard);
maybe_inject_simulated_db_merging_delay();
- if (_pendingClusterState->isVersionedTransition()) {
- LOG(debug, "Activating pending cluster state version %u", _pendingClusterState->clusterStateVersion());
- enableCurrentClusterStateBundleInDistributor();
- if (_pendingClusterState->hasCommand()) {
- _distributor_interface.getMessageSender().sendDown(
- _pendingClusterState->getCommand());
+ if (_pending_cluster_state->isVersionedTransition()) {
+ LOG(debug, "Activating pending cluster state version %u", _pending_cluster_state->clusterStateVersion());
+ enable_current_cluster_state_bundle_in_distributor_and_stripes(guard);
+ if (_pending_cluster_state->hasCommand()) {
+ _distributor_interface.getMessageSender().sendDown(_pending_cluster_state->getCommand());
}
- addCurrentStateToClusterStateHistory();
+ add_current_state_to_cluster_state_history();
} else {
LOG(debug, "Activating pending distribution config");
// TODO distribution changes cannot currently be deferred as they are not
// initiated by the cluster controller!
- _distributor_interface.notifyDistributionChangeEnabled();
+ _distributor_interface.notifyDistributionChangeEnabled(); // TODO factor these two out into one func?
+ guard.notify_distribution_change_enabled();
}
- update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle());
- _pendingClusterState.reset();
- _outdatedNodesMap.clear();
- _op_ctx.bucket_space_repo().clear_pending_cluster_state_bundle();
- sendAllQueuedBucketRechecks();
- completeTransitionTimer();
- clearReadOnlyBucketRepoDatabases();
+ guard.update_read_snapshot_after_activation(_pending_cluster_state->getNewClusterStateBundle());
+ _pending_cluster_state.reset();
+ _outdated_nodes_map.clear();
+ guard.clear_pending_cluster_state_bundle();
+ send_all_queued_bucket_rechecks();
+ complete_transition_timer();
+ guard.clear_read_only_bucket_repo_databases();
_distributor_interface.getMetrics().activate_cluster_state_processing_time.addValue(
process_timer.getElapsedTimeAsDouble());
}
void
-BucketDBUpdater::enableCurrentClusterStateBundleInDistributor()
+BucketDBUpdater::enable_current_cluster_state_bundle_in_distributor_and_stripes(StripeAccessGuard& guard)
{
- const lib::ClusterStateBundle& state(
- _pendingClusterState->getNewClusterStateBundle());
+ const lib::ClusterStateBundle& state = _pending_cluster_state->getNewClusterStateBundle();
- LOG(debug,
- "BucketDBUpdater finished processing state %s",
+ _active_state_bundle = _pending_cluster_state->getNewClusterStateBundle();
+ propagate_active_state_bundle_internally();
+
+ LOG(debug, "BucketDBUpdater finished processing state %s",
state.getBaselineClusterState()->toString().c_str());
+ // First enable the cluster state for the _top-level_ distributor component.
_distributor_interface.enableClusterStateBundle(state);
+ // And then subsequently for all underlying stripes. Technically the order doesn't matter
+ // since all threads are blocked at this point.
+ guard.enable_cluster_state_bundle(state);
}
void BucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state) {
- update_read_snapshot_after_activation(activated_state);
+ auto guard = _stripe_accessor.rendezvous_and_hold_all();
_distributor_interface.enableClusterStateBundle(activated_state);
+ guard->enable_cluster_state_bundle(activated_state);
+
+ _active_state_bundle = activated_state;
+ propagate_active_state_bundle_internally();
}
void
-BucketDBUpdater::addCurrentStateToClusterStateHistory()
+BucketDBUpdater::add_current_state_to_cluster_state_history()
{
- _history.push_back(_pendingClusterState->getSummary());
+ _history.push_back(_pending_cluster_state->getSummary());
if (_history.size() > 50) {
_history.pop_front();
@@ -857,22 +699,22 @@ BucketDBUpdater::reportStatus(std::ostream& out,
xos << XmlTag("status")
<< XmlAttribute("id", BUCKETDB)
<< XmlAttribute("name", BUCKETDB_UPDATER);
- reportXmlStatus(xos, path);
+ report_xml_status(xos, path);
xos << XmlEndTag();
return true;
}
vespalib::string
-BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos,
- const framework::HttpUrlPath&) const
+BucketDBUpdater::report_xml_status(vespalib::xml::XmlOutputStream& xos,
+ const framework::HttpUrlPath&) const
{
using namespace vespalib::xml;
xos << XmlTag("bucketdb")
<< XmlTag("systemstate_active")
<< XmlContent(_op_ctx.cluster_state_bundle().getBaselineClusterState()->toString())
<< XmlEndTag();
- if (_pendingClusterState) {
- xos << *_pendingClusterState;
+ if (_pending_cluster_state) {
+ xos << *_pending_cluster_state;
}
xos << XmlTag("systemstate_history");
for (auto i(_history.rbegin()), e(_history.rend()); i != e; ++i) {
@@ -884,13 +726,13 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos,
}
xos << XmlEndTag()
<< XmlTag("single_bucket_requests");
- for (const auto & entry : _sentMessages)
+ for (const auto & entry : _sent_messages)
{
entry.second.print_xml_tag(xos, XmlAttribute("sendtimestamp", entry.second.timestamp));
}
xos << XmlEndTag()
<< XmlTag("delayed_single_bucket_requests");
- for (const auto & entry : _delayedRequests)
+ for (const auto & entry : _delayed_requests)
{
entry.second.print_xml_tag(xos, XmlAttribute("resendtimestamp", entry.first.getTime()));
}
@@ -898,166 +740,4 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos,
return "";
}
-BucketDBUpdater::MergingNodeRemover::MergingNodeRemover(
- const lib::ClusterState& oldState,
- const lib::ClusterState& s,
- uint16_t localIndex,
- const lib::Distribution& distribution,
- const char* upStates,
- bool track_non_owned_entries)
- : _oldState(oldState),
- _state(s),
- _available_nodes(),
- _nonOwnedBuckets(),
- _removed_buckets(0),
- _removed_documents(0),
- _localIndex(localIndex),
- _distribution(distribution),
- _upStates(upStates),
- _track_non_owned_entries(track_non_owned_entries),
- _cachedDecisionSuperbucket(UINT64_MAX),
- _cachedOwned(false)
-{
- // TODO intersection of cluster state and distribution config
- const uint16_t storage_count = s.getNodeCount(lib::NodeType::STORAGE);
- _available_nodes.resize(storage_count);
- for (uint16_t i = 0; i < storage_count; ++i) {
- if (s.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState().oneOf(_upStates)) {
- _available_nodes[i] = true;
- }
- }
-}
-
-void
-BucketDBUpdater::MergingNodeRemover::logRemove(const document::BucketId& bucketId, const char* msg) const
-{
- LOG(spam, "Removing bucket %s: %s", bucketId.toString().c_str(), msg);
-}
-
-namespace {
-
-uint64_t superbucket_from_id(const document::BucketId& id, uint16_t distribution_bits) noexcept {
- // The n LSBs of the bucket ID contain the superbucket number. Mask off the rest.
- return id.getRawId() & ~(UINT64_MAX << distribution_bits);
-}
-
-}
-
-bool
-BucketDBUpdater::MergingNodeRemover::distributorOwnsBucket(
- const document::BucketId& bucketId) const
-{
- // TODO "no distributors available" case is the same for _all_ buckets; cache once in constructor.
- // TODO "too few bits used" case can be cheaply checked without needing exception
- try {
- const auto bits = _state.getDistributionBitCount();
- const auto this_superbucket = superbucket_from_id(bucketId, bits);
- if (_cachedDecisionSuperbucket == this_superbucket) {
- if (!_cachedOwned) {
- logRemove(bucketId, "bucket now owned by another distributor (cached)");
- }
- return _cachedOwned;
- }
-
- uint16_t distributor = _distribution.getIdealDistributorNode(_state, bucketId, "uim");
- _cachedDecisionSuperbucket = this_superbucket;
- _cachedOwned = (distributor == _localIndex);
- if (!_cachedOwned) {
- logRemove(bucketId, "bucket now owned by another distributor");
- return false;
- }
- return true;
- } catch (lib::TooFewBucketBitsInUseException& exc) {
- logRemove(bucketId, "using too few distribution bits now");
- } catch (lib::NoDistributorsAvailableException& exc) {
- logRemove(bucketId, "no distributors are available");
- }
- return false;
-}
-
-void
-BucketDBUpdater::MergingNodeRemover::setCopiesInEntry(
- BucketDatabase::Entry& e,
- const std::vector<BucketCopy>& copies) const
-{
- e->clear();
-
- std::vector<uint16_t> order =
- _distribution.getIdealStorageNodes(_state, e.getBucketId(), _upStates);
-
- e->addNodes(copies, order);
-
- LOG(spam, "Changed %s", e->toString().c_str());
-}
-
-bool
-BucketDBUpdater::MergingNodeRemover::has_unavailable_nodes(const storage::BucketDatabase::Entry& e) const
-{
- const uint16_t n_nodes = e->getNodeCount();
- for (uint16_t i = 0; i < n_nodes; i++) {
- const uint16_t node_idx = e->getNodeRef(i).getNode();
- if (!storage_node_is_available(node_idx)) {
- return true;
- }
- }
- return false;
-}
-
-BucketDatabase::MergingProcessor::Result
-BucketDBUpdater::MergingNodeRemover::merge(storage::BucketDatabase::Merger& merger)
-{
- document::BucketId bucketId(merger.bucket_id());
- LOG(spam, "Check for remove: bucket %s", bucketId.toString().c_str());
- if (!distributorOwnsBucket(bucketId)) {
- // TODO remove in favor of DB snapshotting
- if (_track_non_owned_entries) {
- _nonOwnedBuckets.emplace_back(merger.current_entry());
- }
- return Result::Skip;
- }
- auto& e = merger.current_entry();
-
- if (e->getNodeCount() == 0) { // TODO when should this edge ever trigger?
- return Result::Skip;
- }
-
- if (!has_unavailable_nodes(e)) {
- return Result::KeepUnchanged;
- }
-
- std::vector<BucketCopy> remainingCopies;
- for (uint16_t i = 0; i < e->getNodeCount(); i++) {
- const uint16_t node_idx = e->getNodeRef(i).getNode();
- if (storage_node_is_available(node_idx)) {
- remainingCopies.push_back(e->getNodeRef(i));
- }
- }
-
- if (remainingCopies.empty()) {
- ++_removed_buckets;
- _removed_documents += e->getHighestDocumentCount();
- return Result::Skip;
- } else {
- setCopiesInEntry(e, remainingCopies);
- return Result::Update;
- }
-}
-
-bool
-BucketDBUpdater::MergingNodeRemover::storage_node_is_available(uint16_t index) const noexcept
-{
- return ((index < _available_nodes.size()) && _available_nodes[index]);
-}
-
-BucketDBUpdater::MergingNodeRemover::~MergingNodeRemover()
-{
- if (_removed_buckets != 0) {
- LOGBM(info, "After cluster state change %s, %zu buckets no longer "
- "have available replicas. %zu documents in these buckets will "
- "be unavailable until nodes come back up",
- _oldState.getTextualDifference(_state).c_str(),
- _removed_buckets, _removed_documents);
- }
-}
-
} // distributor
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 375a5cee4e7..1a4677ee507 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -26,8 +26,11 @@ class XmlAttribute;
namespace storage::distributor {
-class DistributorStripeInterface;
+class BucketSpaceDistributionConfigs;
class BucketSpaceDistributionContext;
+class DistributorStripeInterface;
+class StripeAccessor;
+class StripeAccessGuard;
class BucketDBUpdater : public framework::StatusReporter,
public api::MessageHandler
@@ -35,41 +38,31 @@ class BucketDBUpdater : public framework::StatusReporter,
public:
using OutdatedNodesMap = dbtransition::OutdatedNodesMap;
BucketDBUpdater(DistributorStripeInterface& owner,
- DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
DistributorMessageSender& sender,
- DistributorComponentRegister& compReg);
+ DistributorComponentRegister& comp_reg,
+ StripeAccessor& stripe_accessor);
~BucketDBUpdater() override;
void flush();
- const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const;
- void recheckBucketInfo(uint32_t nodeIdx, const document::Bucket& bucket);
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override;
bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) override;
bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply> & repl) override;
bool onMergeBucketReply(const std::shared_ptr<api::MergeBucketReply>& reply) override;
- bool onNotifyBucketChange(const std::shared_ptr<api::NotifyBucketChangeCommand>&) override;
- void resendDelayedMessages();
- void storageDistributionChanged();
- vespalib::string reportXmlStatus(vespalib::xml::XmlOutputStream&, const framework::HttpUrlPath&) const;
vespalib::string getReportContentType(const framework::HttpUrlPath&) const override;
bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override;
+
+ void resend_delayed_messages();
+ void storage_distribution_changed(const BucketSpaceDistributionConfigs& configs);
+ void bootstrap_distribution_config(std::shared_ptr<const lib::Distribution>);
+
+ vespalib::string report_xml_status(vespalib::xml::XmlOutputStream& xos, const framework::HttpUrlPath&) const;
+
void print(std::ostream& out, bool verbose, const std::string& indent) const;
const DistributorNodeContext& node_context() const { return _node_ctx; }
DistributorOperationContext& operation_context() { return _op_ctx; }
- /**
- * Returns whether the current PendingClusterState indicates that there has
- * been a transfer of bucket ownership amongst the distributors in the
- * cluster. This method only makes sense to call when _pendingClusterState
- * is active, such as from within a enableClusterState() call.
- */
- bool bucketOwnershipHasChanged() const {
- return ((_pendingClusterState.get() != nullptr)
- && _pendingClusterState->hasBucketOwnershipTransfer());
- }
void set_stale_reads_enabled(bool enabled) noexcept {
_stale_reads_enabled.store(enabled, std::memory_order_relaxed);
}
@@ -77,7 +70,6 @@ public:
return _stale_reads_enabled.load(std::memory_order_relaxed);
}
- OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const;
private:
class MergeReplyGuard {
public:
@@ -141,126 +133,78 @@ private:
// Transitively invokes Distributor::enableClusterStateBundle
void simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state);
- bool shouldDeferStateEnabling() const noexcept;
- bool hasPendingClusterState() const;
- bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
- bool processSingleBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
- void handleSingleBucketInfoFailure(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
- const BucketRequest& req);
- bool isPendingClusterStateCompleted() const;
- void processCompletedPendingClusterState();
- void activatePendingClusterState();
- void mergeBucketInfoWithDatabase(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
- const BucketRequest& req);
- void convertBucketInfoToBucketList(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
- uint16_t targetNode, BucketListMerger::BucketList& newList);
- void sendRequestBucketInfo(uint16_t node, const document::Bucket& bucket,
- const std::shared_ptr<MergeReplyGuard>& mergeReply);
- void addBucketInfoForNode(const BucketDatabase::Entry& e, uint16_t node,
- BucketListMerger::BucketList& existing) const;
- void ensureTransitionTimerStarted();
- void completeTransitionTimer();
- void clearReadOnlyBucketRepoDatabases();
+ bool should_defer_state_enabling() const noexcept;
+ bool has_pending_cluster_state() const;
+ bool pending_cluster_state_accepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
+ bool process_single_bucket_info_reply(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
+ void handle_single_bucket_info_failure(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ const BucketRequest& req);
+ bool is_pending_cluster_state_completed() const;
+ void process_completed_pending_cluster_state(StripeAccessGuard& guard);
+ void activate_pending_cluster_state(StripeAccessGuard& guard);
+ void merge_bucket_info_with_database(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ const BucketRequest& req);
+ void convert_bucket_info_to_bucket_list(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ uint16_t targetNode, BucketListMerger::BucketList& newList);
+ void send_request_bucket_info(uint16_t node, const document::Bucket& bucket,
+ const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard);
+ void add_bucket_info_for_node(const BucketDatabase::Entry& e, uint16_t node,
+ BucketListMerger::BucketList& existing) const;
+ void ensure_transition_timer_started();
+ void complete_transition_timer();
/**
* Adds all buckets contained in the bucket database
* that are either contained
* in bucketId, or that bucketId is contained in, that have copies
* on the given node.
*/
- void findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket,
- BucketListMerger::BucketList& existing);
+ void find_related_buckets_in_database(uint16_t node, const document::Bucket& bucket,
+ BucketListMerger::BucketList& existing);
/**
Updates the bucket database from the information generated by the given
bucket list merger.
*/
- void updateDatabase(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger);
+ void update_database(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger);
- void updateState(const lib::ClusterState& oldState, const lib::ClusterState& newState);
+ void remove_superfluous_buckets(StripeAccessGuard& guard,
+ const lib::ClusterStateBundle& new_state,
+ bool is_distribution_config_change);
- void update_read_snapshot_before_db_pruning();
- void removeSuperfluousBuckets(const lib::ClusterStateBundle& newState,
- bool is_distribution_config_change);
- void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state);
- void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state);
-
- void replyToPreviousPendingClusterStateIfAny();
- void replyToActivationWithActualVersion(
+ void reply_to_previous_pending_cluster_state_if_any();
+ void reply_to_activation_with_actual_version(
const api::ActivateClusterStateVersionCommand& cmd,
uint32_t actualVersion);
- void enableCurrentClusterStateBundleInDistributor();
- void addCurrentStateToClusterStateHistory();
- void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::Bucket&);
- void sendAllQueuedBucketRechecks();
+ void enable_current_cluster_state_bundle_in_distributor_and_stripes(StripeAccessGuard& guard);
+ void add_current_state_to_cluster_state_history();
+ void enqueue_recheck_until_pending_state_enabled(uint16_t node, const document::Bucket& bucket);
+ void send_all_queued_bucket_rechecks();
+
+ void propagate_active_state_bundle_internally();
void maybe_inject_simulated_db_pruning_delay();
void maybe_inject_simulated_db_merging_delay();
- /**
- Removes all copies of buckets that are on nodes that are down.
- */
- class MergingNodeRemover : public BucketDatabase::MergingProcessor {
- public:
- MergingNodeRemover(const lib::ClusterState& oldState,
- const lib::ClusterState& s,
- uint16_t localIndex,
- const lib::Distribution& distribution,
- const char* upStates,
- bool track_non_owned_entries);
- ~MergingNodeRemover() override;
-
- Result merge(BucketDatabase::Merger&) override;
- void logRemove(const document::BucketId& bucketId, const char* msg) const;
- bool distributorOwnsBucket(const document::BucketId&) const;
-
- const std::vector<BucketDatabase::Entry>& getNonOwnedEntries() const noexcept {
- return _nonOwnedBuckets;
- }
- private:
- void setCopiesInEntry(BucketDatabase::Entry& e, const std::vector<BucketCopy>& copies) const;
-
- bool has_unavailable_nodes(const BucketDatabase::Entry&) const;
- bool storage_node_is_available(uint16_t index) const noexcept;
-
- const lib::ClusterState _oldState;
- const lib::ClusterState _state;
- std::vector<bool> _available_nodes;
- std::vector<BucketDatabase::Entry> _nonOwnedBuckets;
- size_t _removed_buckets;
- size_t _removed_documents;
-
- uint16_t _localIndex;
- const lib::Distribution& _distribution;
- const char* _upStates;
- bool _track_non_owned_entries;
-
- mutable uint64_t _cachedDecisionSuperbucket;
- mutable bool _cachedOwned;
- };
+ // TODO STRIPE remove once distributor component dependencies have been pruned
+ StripeAccessor& _stripe_accessor;
+ lib::ClusterStateBundle _active_state_bundle;
+ std::unique_ptr<DistributorBucketSpaceRepo> _dummy_mutable_bucket_space_repo;
+ std::unique_ptr<DistributorBucketSpaceRepo> _dummy_read_only_bucket_space_repo;
- DistributorStripeComponent _distributorComponent;
+ DistributorStripeComponent _distributor_component;
const DistributorNodeContext& _node_ctx;
DistributorOperationContext& _op_ctx;
DistributorStripeInterface& _distributor_interface;
- std::deque<std::pair<framework::MilliSecTime, BucketRequest> > _delayedRequests;
- std::map<uint64_t, BucketRequest> _sentMessages;
- std::unique_ptr<PendingClusterState> _pendingClusterState;
+ std::deque<std::pair<framework::MilliSecTime, BucketRequest>> _delayed_requests;
+ std::map<uint64_t, BucketRequest> _sent_messages;
+ std::unique_ptr<PendingClusterState> _pending_cluster_state;
std::list<PendingClusterState::Summary> _history;
DistributorMessageSender& _sender;
- std::set<EnqueuedBucketRecheck> _enqueuedRechecks;
- OutdatedNodesMap _outdatedNodesMap;
- framework::MilliSecTimer _transitionTimer;
+ std::set<EnqueuedBucketRecheck> _enqueued_rechecks;
+ OutdatedNodesMap _outdated_nodes_map;
+ framework::MilliSecTimer _transition_timer;
std::atomic<bool> _stale_reads_enabled;
- using DistributionContexts = std::unordered_map<document::BucketSpace,
- std::shared_ptr<BucketSpaceDistributionContext>,
- document::BucketSpace::hash>;
- DistributionContexts _active_distribution_contexts;
- using DbGuards = std::unordered_map<document::BucketSpace,
- std::shared_ptr<BucketDatabase::ReadGuard>,
- document::BucketSpace::hash>;
- DbGuards _explicit_transition_read_guard;
- mutable std::mutex _distribution_context_mutex;
};
}
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index f886640d1f9..0f63de5cd85 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -1,12 +1,15 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
//
#include "blockingoperationstarter.h"
+#include "bucket_space_distribution_configs.h"
+#include "bucketdbupdater.h"
#include "distributor.h"
#include "distributor_bucket_space.h"
#include "distributor_status.h"
#include "distributor_stripe.h"
#include "distributormetricsset.h"
#include "idealstatemetricsset.h"
+#include "legacy_single_stripe_accessor.h"
#include "operation_sequencer.h"
#include "ownership_transfer_safe_time_point_calculator.h"
#include "throttlingoperationstarter.h"
@@ -47,16 +50,21 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
ChainedMessageSender* messageSender)
: StorageLink("distributor"),
framework::StatusReporter("distributor", "Distributor"),
+ _comp_reg(compReg),
_metrics(std::make_shared<DistributorMetricSet>()),
_messageSender(messageSender),
_stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, doneInitHandler,
manageActiveBucketCopies, *this)),
+ _stripe_accessor(std::make_unique<LegacySingleStripeAccessor>(*_stripe)),
_component(compReg, "distributor"),
+ _bucket_db_updater(),
_distributorStatusDelegate(compReg, *this, *this),
_threadPool(threadPool),
_tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN),
_metricUpdateHook(*this),
- _hostInfoReporter(*this, *this)
+ _hostInfoReporter(*this, *this),
+ _distribution(),
+ _next_distribution()
{
_component.registerMetric(*_metrics);
_component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0));
@@ -113,12 +121,12 @@ Distributor::distributor_component() noexcept {
return _stripe->_component;
}
-BucketDBUpdater&
+StripeBucketDBUpdater&
Distributor::bucket_db_updater() {
return _stripe->bucket_db_updater();
}
-const BucketDBUpdater&
+const StripeBucketDBUpdater&
Distributor::bucket_db_updater() const {
return _stripe->bucket_db_updater();
}
@@ -188,6 +196,9 @@ Distributor::onOpen()
void Distributor::onClose() {
LOG(debug, "Distributor::onClose invoked");
_stripe->close();
+ if (_bucket_db_updater) {
+ _bucket_db_updater->flush();
+ }
}
void
@@ -210,18 +221,47 @@ Distributor::sendDown(const std::shared_ptr<api::StorageMessage>& msg)
}
}
+namespace {
+
+bool should_be_handled_by_top_level_bucket_db_updater(const api::StorageMessage& msg) noexcept {
+ switch (msg.getType().getId()) {
+ case api::MessageType::SETSYSTEMSTATE_ID:
+ case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID:
+ return true;
+ case api::MessageType::REQUESTBUCKETINFO_REPLY_ID:
+ // Top-level component should only handle replies for full bucket info fetches.
+ // Bucket-specific requests should go to the stripes that sent them.
+ return dynamic_cast<const api::RequestBucketInfoReply&>(msg).full_bucket_fetch();
+ default:
+ return false;
+ }
+}
+
+}
+
bool
Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
{
+ // FIXME STRIPE this MUST be in a separate thread to enforce processing in a single thread
+ // regardless of what RPC thread (comm mgr, FRT...) this is called from!
+ if (_bucket_db_updater && should_be_handled_by_top_level_bucket_db_updater(*msg)) {
+ return msg->callHandler(*_bucket_db_updater, msg);
+ }
+ // TODO STRIPE can we route both requests and responses that are BucketCommand|Reply based on their bucket alone?
+ // that covers most operations already...
return _stripe->onDown(msg);
}
bool
Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply)
{
+ if (_bucket_db_updater && should_be_handled_by_top_level_bucket_db_updater(*reply)) {
+ return reply->callHandler(*_bucket_db_updater, reply);
+ }
return _stripe->handleReply(reply);
}
+// TODO STRIPE we need to reintroduce the top-level message queue...
bool
Distributor::handleMessage(const std::shared_ptr<api::StorageMessage>& msg)
{
@@ -245,21 +285,42 @@ Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state)
void
Distributor::storageDistributionChanged()
{
- // May happen from any thread.
- _stripe->storageDistributionChanged();
+ if (_bucket_db_updater) {
+ if (!_distribution || (*_component.getDistribution() != *_distribution)) {
+ LOG(debug, "Distribution changed to %s, must re-fetch bucket information",
+ _component.getDistribution()->toString().c_str());
+ _next_distribution = _component.getDistribution(); // FIXME this is not thread safe
+ } else {
+ LOG(debug, "Got distribution change, but the distribution %s was the same as before: %s",
+ _component.getDistribution()->toString().c_str(),
+ _distribution->toString().c_str());
+ }
+ } else {
+ // May happen from any thread.
+ _stripe->storageDistributionChanged();
+ }
}
void
Distributor::enableNextDistribution()
{
- _stripe->enableNextDistribution();
+ if (_next_distribution && _bucket_db_updater) {
+ _distribution = _next_distribution;
+ _next_distribution = std::shared_ptr<lib::Distribution>();
+ auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(_distribution);
+ _bucket_db_updater->storage_distribution_changed(new_configs);
+ } else {
+ _stripe->enableNextDistribution();
+ }
}
// TODO STRIPE only used by tests to directly inject new distribution config
+// - actually, also by ctor
void
Distributor::propagateDefaultDistribution(
std::shared_ptr<const lib::Distribution> distribution)
{
+ // TODO STRIPE top-level bucket DB updater
_stripe->propagateDefaultDistribution(std::move(distribution));
}
@@ -299,6 +360,9 @@ framework::ThreadWaitInfo
Distributor::doCriticalTick(framework::ThreadIndex idx)
{
_tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
+ if (_bucket_db_updater) {
+ enableNextDistribution();
+ }
// Propagates any new configs down to stripe(s)
enableNextConfig();
_stripe->doCriticalTick(idx);
@@ -309,6 +373,9 @@ Distributor::doCriticalTick(framework::ThreadIndex idx)
framework::ThreadWaitInfo
Distributor::doNonCriticalTick(framework::ThreadIndex idx)
{
+ if (_bucket_db_updater) {
+ _bucket_db_updater->resend_delayed_messages();
+ }
// TODO STRIPE stripes need their own thread loops!
_stripe->doNonCriticalTick(idx);
_tickResult = _stripe->_tickResult;
@@ -318,6 +385,11 @@ Distributor::doNonCriticalTick(framework::ThreadIndex idx)
void
Distributor::enableNextConfig()
{
+ // FIXME STRIPE enforce this cannot happen live, only valid for startup config edge
+ if (getConfig().num_distributor_stripes() > 0 && !_bucket_db_updater) {
+ // FIXME STRIPE using the singular stripe here is a temporary Hack McHack Deluxe 3000!
+ _bucket_db_updater = std::make_unique<BucketDBUpdater>(*_stripe, *_stripe, _comp_reg, *_stripe_accessor);
+ }
_hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting());
_stripe->enableNextConfig(); // TODO STRIPE avoid redundant call
}
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index bfffe126b44..9ad6425afe9 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -11,6 +11,7 @@
#include "min_replica_provider.h"
#include "pendingmessagetracker.h"
#include "statusreporterdelegate.h"
+#include "stripe_bucket_db_updater.h" // TODO this is temporary
#include <vespa/config/config.h>
#include <vespa/storage/common/distributorcomponent.h>
#include <vespa/storage/common/doneinitializehandler.h>
@@ -33,10 +34,12 @@ namespace storage::distributor {
class BlockingOperationStarter;
class BucketPriorityDatabase;
+class BucketDBUpdater;
class DistributorBucketSpaceRepo;
class DistributorStatus;
class DistributorStripe;
class OperationSequencer;
+class LegacySingleStripeAccessor;
class OwnershipTransferSafeTimePointCalculator;
class SimpleMaintenanceScanner;
class ThrottlingOperationStarter;
@@ -135,8 +138,8 @@ private:
bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg);
// Accessors used by tests
- BucketDBUpdater& bucket_db_updater();
- const BucketDBUpdater& bucket_db_updater() const;
+ StripeBucketDBUpdater& bucket_db_updater();
+ const StripeBucketDBUpdater& bucket_db_updater() const;
IdealStateManager& ideal_state_manager();
const IdealStateManager& ideal_state_manager() const;
ExternalOperationHandler& external_operation_handler();
@@ -162,16 +165,22 @@ private:
void enableNextDistribution();
void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>);
+ DistributorComponentRegister& _comp_reg;
std::shared_ptr<DistributorMetricSet> _metrics;
ChainedMessageSender* _messageSender;
// TODO STRIPE multiple stripes...! This is for proof of concept of wiring.
std::unique_ptr<DistributorStripe> _stripe;
+ std::unique_ptr<LegacySingleStripeAccessor> _stripe_accessor;
storage::DistributorComponent _component;
+ std::unique_ptr<BucketDBUpdater> _bucket_db_updater;
StatusReporterDelegate _distributorStatusDelegate;
framework::TickingThreadPool& _threadPool;
framework::ThreadWaitInfo _tickResult;
MetricUpdateHook _metricUpdateHook;
DistributorHostInfoReporter _hostInfoReporter;
+
+ std::shared_ptr<lib::Distribution> _distribution;
+ std::shared_ptr<lib::Distribution> _next_distribution;
};
}
diff --git a/storage/src/vespa/storage/distributor/distributor_bucket_space.h b/storage/src/vespa/storage/distributor/distributor_bucket_space.h
index f4d5bd6f5aa..558cbada31f 100644
--- a/storage/src/vespa/storage/distributor/distributor_bucket_space.h
+++ b/storage/src/vespa/storage/distributor/distributor_bucket_space.h
@@ -80,6 +80,7 @@ public:
}
void set_pending_cluster_state(std::shared_ptr<const lib::ClusterState> pending_cluster_state);
+ bool has_pending_cluster_state() const noexcept { return static_cast<bool>(_pending_cluster_state); }
const lib::ClusterState& get_pending_cluster_state() const noexcept { return *_pending_cluster_state; }
/**
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index e41c7940a0d..906129ebf96 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -478,6 +478,8 @@ DistributorStripe::checkBucketForSplit(document::BucketSpace bucketSpace,
}
}
+// TODO STRIPE must only be called when operating in legacy single stripe mode!
+// In other cases, distribution config switching is controlled by top-level distributor, not via framework(tm).
void
DistributorStripe::enableNextDistribution()
{
@@ -485,10 +487,12 @@ DistributorStripe::enableNextDistribution()
_distribution = _nextDistribution;
propagateDefaultDistribution(_distribution);
_nextDistribution = std::shared_ptr<lib::Distribution>();
+ // TODO conditional on whether top-level DB updater is in charge
_bucketDBUpdater.storageDistributionChanged();
}
}
+// TODO STRIPE must be invoked by top-level bucket db updater probably
void
DistributorStripe::propagateDefaultDistribution(
std::shared_ptr<const lib::Distribution> distribution)
@@ -500,6 +504,19 @@ DistributorStripe::propagateDefaultDistribution(
}
}
+// Only called when stripe is in rendezvous freeze
+void
+DistributorStripe::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) {
+ auto default_distr = new_configs.get_or_nullptr(document::FixedBucketSpaces::default_space());
+ auto global_distr = new_configs.get_or_nullptr(document::FixedBucketSpaces::global_space());
+ assert(default_distr && global_distr);
+
+ for (auto* repo : {_bucketSpaceRepo.get(), _readOnlyBucketSpaceRepo.get()}) {
+ repo->get(document::FixedBucketSpaces::default_space()).setDistribution(default_distr);
+ repo->get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr);
+ }
+}
+
void
DistributorStripe::propagateClusterStates()
{
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index 10b3f54d834..4a43b93354e 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -3,7 +3,6 @@
#pragma once
#include "bucket_spaces_stats_provider.h"
-#include "bucketdbupdater.h"
#include "distributor_host_info_reporter.h"
#include "distributor_stripe_interface.h"
#include "externaloperationhandler.h"
@@ -11,6 +10,8 @@
#include "min_replica_provider.h"
#include "pendingmessagetracker.h"
#include "statusreporterdelegate.h"
+#include "stripe_access_guard.h"
+#include "stripe_bucket_db_updater.h"
#include <vespa/config/config.h>
#include <vespa/storage/common/doneinitializehandler.h>
#include <vespa/storage/common/messagesender.h>
@@ -98,7 +99,7 @@ public:
/**
* Invoked when a pending cluster state for a distribution (config)
- * change has been enabled. An invocation of storageDistributionChanged
+ * change has been enabled. An invocation of storage_distribution_changed
* will eventually cause this method to be called, assuming the pending
* cluster state completed successfully.
*/
@@ -169,8 +170,8 @@ public:
return *_bucketIdHasher;
}
- BucketDBUpdater& bucket_db_updater() { return _bucketDBUpdater; }
- const BucketDBUpdater& bucket_db_updater() const { return _bucketDBUpdater; }
+ StripeBucketDBUpdater& bucket_db_updater() { return _bucketDBUpdater; }
+ const StripeBucketDBUpdater& bucket_db_updater() const { return _bucketDBUpdater; }
IdealStateManager& ideal_state_manager() { return _idealStateManager; }
const IdealStateManager& ideal_state_manager() const { return _idealStateManager; }
ExternalOperationHandler& external_operation_handler() { return _externalOperationHandler; }
@@ -198,6 +199,7 @@ private:
friend class DistributorTestUtil;
friend class MetricUpdateHook;
friend class Distributor;
+ friend class LegacySingleStripeAccessGuard;
bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg);
bool isMaintenanceReply(const api::StorageReply& reply) const;
@@ -251,9 +253,10 @@ private:
bool generateOperation(const std::shared_ptr<api::StorageMessage>& msg,
Operation::SP& operation);
- void enableNextDistribution();
- void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>);
+ void enableNextDistribution(); // TODO STRIPE remove once legacy is gone
+ void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>); // TODO STRIPE remove once legacy is gone
void propagateClusterStates();
+ void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs);
BucketSpacesStatsProvider::BucketSpacesStats make_invalid_stats_per_configured_space() const;
template <typename NodeFunctor>
@@ -276,7 +279,7 @@ private:
std::unique_ptr<OperationSequencer> _operation_sequencer;
PendingMessageTracker _pendingMessageTracker;
- BucketDBUpdater _bucketDBUpdater;
+ StripeBucketDBUpdater _bucketDBUpdater;
StatusReporterDelegate _distributorStatusDelegate;
StatusReporterDelegate _bucketDBStatusDelegate;
IdealStateManager _idealStateManager;
diff --git a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp
new file mode 100644
index 00000000000..04f210205c3
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp
@@ -0,0 +1,88 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "legacy_single_stripe_accessor.h"
+#include "distributor_stripe.h"
+
+namespace storage::distributor {
+
+LegacySingleStripeAccessGuard::LegacySingleStripeAccessGuard(LegacySingleStripeAccessor& accessor,
+ DistributorStripe& stripe)
+ : _accessor(accessor),
+ _stripe(stripe)
+{}
+
+LegacySingleStripeAccessGuard::~LegacySingleStripeAccessGuard() {
+ _accessor.mark_guard_released();
+}
+
+void LegacySingleStripeAccessGuard::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) {
+ _stripe.update_distribution_config(new_configs);
+}
+
+void LegacySingleStripeAccessGuard::set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) {
+ _stripe.getBucketSpaceRepo().set_pending_cluster_state_bundle(pending_state);
+ // TODO STRIPE also read only repo?
+}
+
+void LegacySingleStripeAccessGuard::clear_pending_cluster_state_bundle() {
+ _stripe.getBucketSpaceRepo().clear_pending_cluster_state_bundle();
+ // TODO STRIPE also read only repo?
+}
+
+void LegacySingleStripeAccessGuard::enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) {
+ _stripe.enableClusterStateBundle(new_state);
+}
+
+void LegacySingleStripeAccessGuard::notify_distribution_change_enabled() {
+ _stripe.notifyDistributionChangeEnabled();
+}
+
+PotentialDataLossReport
+LegacySingleStripeAccessGuard::remove_superfluous_buckets(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change)
+{
+ return _stripe.bucket_db_updater().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change);
+}
+
+void
+LegacySingleStripeAccessGuard::merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries)
+{
+ _stripe.bucket_db_updater().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution,
+ new_state, storage_up_states, outdated_nodes, entries);
+}
+
+void LegacySingleStripeAccessGuard::update_read_snapshot_before_db_pruning() {
+ _stripe.bucket_db_updater().update_read_snapshot_before_db_pruning();
+}
+
+void LegacySingleStripeAccessGuard::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) {
+ _stripe.bucket_db_updater().update_read_snapshot_after_db_pruning(new_state);
+}
+
+void LegacySingleStripeAccessGuard::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) {
+ _stripe.bucket_db_updater().update_read_snapshot_after_activation(activated_state);
+}
+
+void LegacySingleStripeAccessGuard::clear_read_only_bucket_repo_databases() {
+ _stripe.bucket_db_updater().clearReadOnlyBucketRepoDatabases();
+}
+
+std::unique_ptr<StripeAccessGuard> LegacySingleStripeAccessor::rendezvous_and_hold_all() {
+ // For sanity checking during development.
+ assert(!_guard_held);
+ _guard_held = true;
+ return std::make_unique<LegacySingleStripeAccessGuard>(*this, _stripe);
+}
+
+void LegacySingleStripeAccessor::mark_guard_released() {
+ assert(_guard_held);
+ _guard_held = false;
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h
new file mode 100644
index 00000000000..4f8cceb0a70
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h
@@ -0,0 +1,67 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "stripe_access_guard.h"
+
+namespace storage::distributor {
+
+class DistributorStripe;
+class LegacySingleStripeAccessor;
+
+/**
+ * Very simple stripe access guard which expects the caller and its single stripe to run in the
+ * same thread. This means there's no actual striping of operations or any thread synchronization
+ * performed. Only intended as a stop-gap while we have legacy stripe behavior.
+ */
+class LegacySingleStripeAccessGuard : public StripeAccessGuard {
+ LegacySingleStripeAccessor& _accessor;
+ DistributorStripe& _stripe;
+public:
+ LegacySingleStripeAccessGuard(LegacySingleStripeAccessor& accessor,
+ DistributorStripe& stripe);
+ ~LegacySingleStripeAccessGuard() override;
+
+ void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override;
+ void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) override;
+ void clear_pending_cluster_state_bundle() override;
+ void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) override;
+ void notify_distribution_change_enabled() override;
+
+ PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change) override;
+ void merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries) override;
+
+ void update_read_snapshot_before_db_pruning() override;
+ void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) override;
+ void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) override;
+ void clear_read_only_bucket_repo_databases() override;
+};
+
+/**
+ * Impl of StripeAccessor which creates LegacySingleStripeAccessGuards bound to a single stripe.
+ */
+class LegacySingleStripeAccessor : public StripeAccessor {
+ DistributorStripe& _stripe;
+ bool _guard_held;
+
+ friend class LegacySingleStripeAccessGuard;
+public:
+ explicit LegacySingleStripeAccessor(DistributorStripe& stripe)
+ : _stripe(stripe),
+ _guard_held(false)
+ {}
+ ~LegacySingleStripeAccessor() override = default;
+
+ std::unique_ptr<StripeAccessGuard> rendezvous_and_hold_all() override;
+private:
+ void mark_guard_released();
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
index af6223afe28..8e54cfcb27d 100644
--- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
@@ -4,6 +4,7 @@
#include "clusterinformation.h"
#include "pendingclusterstate.h"
#include "distributor_bucket_space.h"
+#include "stripe_access_guard.h"
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <algorithm>
@@ -18,13 +19,15 @@ using lib::NodeType;
using lib::NodeState;
PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(const PendingClusterState &pendingClusterState,
+ document::BucketSpace bucket_space,
DistributorBucketSpace &distributorBucketSpace,
bool distributionChanged,
const OutdatedNodes &outdatedNodes,
std::shared_ptr<const ClusterInformation> clusterInfo,
const lib::ClusterState &newClusterState,
api::Timestamp creationTimestamp)
- : _entries(),
+ : _bucket_space(bucket_space),
+ _entries(),
_iter(0),
_removedBuckets(),
_missingEntries(),
@@ -53,7 +56,7 @@ PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(const PendingClus
PendingBucketSpaceDbTransition::~PendingBucketSpaceDbTransition() = default;
PendingBucketSpaceDbTransition::Range
-PendingBucketSpaceDbTransition::skipAllForSameBucket()
+PendingBucketSpaceDbTransition::DbMerger::skipAllForSameBucket()
{
Range r(_iter, _iter);
@@ -68,7 +71,7 @@ PendingBucketSpaceDbTransition::skipAllForSameBucket()
}
std::vector<BucketCopy>
-PendingBucketSpaceDbTransition::getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range)
+PendingBucketSpaceDbTransition::DbMerger::getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range)
{
std::vector<BucketCopy> copiesToAdd;
for (uint32_t i = range.first; i < range.second; ++i) {
@@ -83,28 +86,21 @@ PendingBucketSpaceDbTransition::getCopiesThatAreNewOrAltered(BucketDatabase::Ent
}
void
-PendingBucketSpaceDbTransition::insertInfo(BucketDatabase::Entry& info, const Range& range)
+PendingBucketSpaceDbTransition::DbMerger::insertInfo(BucketDatabase::Entry& info, const Range& range)
{
std::vector<BucketCopy> copiesToAddOrUpdate(
getCopiesThatAreNewOrAltered(info, range));
- const auto &dist(_distributorBucketSpace.getDistribution());
std::vector<uint16_t> order(
- dist.getIdealStorageNodes(
- _newClusterState,
+ _distribution.getIdealStorageNodes(
+ _new_state,
_entries[range.first].bucket_id(),
- _clusterInfo->getStorageUpStates()));
+ _storage_up_states));
info->addNodes(copiesToAddOrUpdate, order, TrustedUpdate::DEFER);
}
-std::string
-PendingBucketSpaceDbTransition::requestNodesToString()
-{
- return _pendingClusterState.requestNodesToString();
-}
-
bool
-PendingBucketSpaceDbTransition::removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId)
+PendingBucketSpaceDbTransition::DbMerger::removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId)
{
bool updated = false;
for (uint32_t i = 0; i < e->getNodeCount();) {
@@ -116,7 +112,7 @@ PendingBucketSpaceDbTransition::removeCopiesFromNodesThatWereRequested(BucketDat
// mark a single remaining replica as trusted even though there might
// be one or more additional replicas pending merge into the database.
if (nodeIsOutdated(entryNode)
- && (info.getTimestamp() < _creationTimestamp)
+ && (info.getTimestamp() < _creation_timestamp)
&& e->removeNode(entryNode, TrustedUpdate::DEFER))
{
LOG(spam,
@@ -133,21 +129,21 @@ PendingBucketSpaceDbTransition::removeCopiesFromNodesThatWereRequested(BucketDat
}
bool
-PendingBucketSpaceDbTransition::databaseIteratorHasPassedBucketInfoIterator(uint64_t bucket_key) const
+PendingBucketSpaceDbTransition::DbMerger::databaseIteratorHasPassedBucketInfoIterator(uint64_t bucket_key) const
{
return ((_iter < _entries.size())
&& (_entries[_iter].bucket_key < bucket_key));
}
bool
-PendingBucketSpaceDbTransition::bucketInfoIteratorPointsToBucket(uint64_t bucket_key) const
+PendingBucketSpaceDbTransition::DbMerger::bucketInfoIteratorPointsToBucket(uint64_t bucket_key) const
{
return _iter < _entries.size() && _entries[_iter].bucket_key == bucket_key;
}
using MergeResult = BucketDatabase::MergingProcessor::Result;
-MergeResult PendingBucketSpaceDbTransition::merge(BucketDatabase::Merger& merger) {
+MergeResult PendingBucketSpaceDbTransition::DbMerger::merge(BucketDatabase::Merger& merger) {
const uint64_t bucket_key = merger.bucket_key();
while (databaseIteratorHasPassedBucketInfoIterator(bucket_key)) {
@@ -158,9 +154,7 @@ MergeResult PendingBucketSpaceDbTransition::merge(BucketDatabase::Merger& merger
auto& e = merger.current_entry();
document::BucketId bucketId(e.getBucketId());
- LOG(spam,
- "Before merging info from nodes [%s], bucket %s had info %s",
- requestNodesToString().c_str(),
+ LOG(spam, "Before merging info, bucket %s had info %s",
bucketId.toString().c_str(),
e.getBucketInfo().toString().c_str());
@@ -185,14 +179,14 @@ MergeResult PendingBucketSpaceDbTransition::merge(BucketDatabase::Merger& merger
return MergeResult::KeepUnchanged;
}
-void PendingBucketSpaceDbTransition::insert_remaining_at_end(BucketDatabase::TrailingInserter& inserter) {
+void PendingBucketSpaceDbTransition::DbMerger::insert_remaining_at_end(BucketDatabase::TrailingInserter& inserter) {
while (_iter < _entries.size()) {
addToInserter(inserter, skipAllForSameBucket());
}
}
void
-PendingBucketSpaceDbTransition::addToMerger(BucketDatabase::Merger& merger, const Range& range)
+PendingBucketSpaceDbTransition::DbMerger::addToMerger(BucketDatabase::Merger& merger, const Range& range)
{
const auto bucket_id = _entries[range.first].bucket_id();
LOG(spam, "Adding new bucket %s with %d copies",
@@ -202,16 +196,14 @@ PendingBucketSpaceDbTransition::addToMerger(BucketDatabase::Merger& merger, cons
BucketDatabase::Entry e(bucket_id, BucketInfo());
insertInfo(e, range);
if (e->getLastGarbageCollectionTime() == 0) {
- e->setLastGarbageCollectionTime(
- framework::MicroSecTime(_creationTimestamp)
- .getSeconds().getTime());
+ e->setLastGarbageCollectionTime(framework::MicroSecTime(_creation_timestamp).getSeconds().getTime());
}
e.getBucketInfo().updateTrusted();
merger.insert_before_current(bucket_id, e);
}
void
-PendingBucketSpaceDbTransition::addToInserter(BucketDatabase::TrailingInserter& inserter, const Range& range)
+PendingBucketSpaceDbTransition::DbMerger::addToInserter(BucketDatabase::TrailingInserter& inserter, const Range& range)
{
// TODO dedupe
const auto bucket_id = _entries[range.first].bucket_id();
@@ -222,20 +214,32 @@ PendingBucketSpaceDbTransition::addToInserter(BucketDatabase::TrailingInserter&
BucketDatabase::Entry e(bucket_id, BucketInfo());
insertInfo(e, range);
if (e->getLastGarbageCollectionTime() == 0) {
- e->setLastGarbageCollectionTime(
- framework::MicroSecTime(_creationTimestamp)
- .getSeconds().getTime());
+ e->setLastGarbageCollectionTime(framework::MicroSecTime(_creation_timestamp).getSeconds().getTime());
}
e.getBucketInfo().updateTrusted();
inserter.insert_at_end(bucket_id, e);
}
+// TODO STRIPE remove legacy single stripe stuff
void
PendingBucketSpaceDbTransition::mergeIntoBucketDatabase()
{
BucketDatabase &db(_distributorBucketSpace.getBucketDatabase());
std::sort(_entries.begin(), _entries.end());
- db.merge(*this);
+
+ const auto& dist = _distributorBucketSpace.getDistribution();
+ DbMerger merger(_creationTimestamp, dist, _newClusterState, _clusterInfo->getStorageUpStates(), _outdatedNodes, _entries);
+
+ db.merge(merger);
+}
+
+void
+PendingBucketSpaceDbTransition::merge_into_bucket_databases(StripeAccessGuard& guard)
+{
+ std::sort(_entries.begin(), _entries.end());
+ const auto& dist = _distributorBucketSpace.getDistribution();
+ guard.merge_entries_into_db(_bucket_space, _creationTimestamp, dist, _newClusterState,
+ _clusterInfo->getStorageUpStates(), _outdatedNodes, _entries);
}
void
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
index 232f1186879..86d2c89bf23 100644
--- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
@@ -3,25 +3,30 @@
#include "pending_bucket_space_db_transition_entry.h"
#include "outdated_nodes.h"
+#include <vespa/document/bucket/bucketspace.h>
#include <vespa/storage/bucketdb/bucketdatabase.h>
#include <unordered_map>
namespace storage::api { class RequestBucketInfoReply; }
-namespace storage::lib { class ClusterState; class State; }
+namespace storage::lib {
+class ClusterState;
+class Distribution;
+class State;
+}
namespace storage::distributor {
class ClusterInformation;
class PendingClusterState;
class DistributorBucketSpace;
+class StripeAccessGuard;
/**
* Class used by PendingClusterState to track request bucket info
* reply result within a bucket space and apply it to the distributor
* bucket database when switching to the pending cluster state.
*/
-class PendingBucketSpaceDbTransition : public BucketDatabase::MergingProcessor
-{
+class PendingBucketSpaceDbTransition {
public:
using Entry = dbtransition::Entry;
using EntryList = std::vector<Entry>;
@@ -29,6 +34,7 @@ public:
private:
using Range = std::pair<uint32_t, uint32_t>;
+ document::BucketSpace _bucket_space;
EntryList _entries;
uint32_t _iter;
std::vector<document::BucketId> _removedBuckets;
@@ -42,45 +48,16 @@ private:
// may be down and thus cannot get a request.
OutdatedNodes _outdatedNodes;
- const lib::ClusterState &_prevClusterState;
- const lib::ClusterState &_newClusterState;
+ const lib::ClusterState& _prevClusterState;
+ const lib::ClusterState& _newClusterState;
const api::Timestamp _creationTimestamp;
- const PendingClusterState &_pendingClusterState;
- DistributorBucketSpace &_distributorBucketSpace;
+ const PendingClusterState& _pendingClusterState;
+ DistributorBucketSpace& _distributorBucketSpace;
uint16_t _distributorIndex;
bool _bucketOwnershipTransfer;
std::unordered_map<uint16_t, size_t> _rejectedRequests;
std::unordered_map<uint16_t, size_t> _failed_requests; // Also includes rejections
- BucketDatabase::MergingProcessor::Result merge(BucketDatabase::Merger&) override;
- void insert_remaining_at_end(BucketDatabase::TrailingInserter&) override;
-
- /**
- * Skips through all entries for the same bucket and returns
- * the range in the entry list for which they were found.
- * The range is [from, to>
- */
- Range skipAllForSameBucket();
-
- std::vector<BucketCopy> getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range);
- void insertInfo(BucketDatabase::Entry& info, const Range& range);
- void addToMerger(BucketDatabase::Merger& merger, const Range& range);
- void addToInserter(BucketDatabase::TrailingInserter& inserter, const Range& range);
-
- bool nodeIsOutdated(uint16_t node) const {
- return (_outdatedNodes.find(node) != _outdatedNodes.end());
- }
-
- // Returns whether at least one replica was removed from the entry.
- // Does NOT implicitly update trusted status on remaining replicas; caller must do
- // this explicitly.
- bool removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId);
-
- // Helper methods for iterating over _entries
- bool databaseIteratorHasPassedBucketInfoIterator(uint64_t bucket_key) const;
- bool bucketInfoIteratorPointsToBucket(uint64_t bucket_key) const;
- std::string requestNodesToString();
-
bool distributorChanged();
static bool nodeWasUpButNowIsDown(const lib::State &old, const lib::State &nw);
bool storageNodeUpInNewState(uint16_t node) const;
@@ -94,17 +71,75 @@ private:
void updateSetOfNodesThatAreOutdated();
public:
+ // Abstracts away the details of how an entry list gathered from content nodes
+ // is actually diffed and merged into a database.
+ class DbMerger : public BucketDatabase::MergingProcessor {
+ api::Timestamp _creation_timestamp;
+ const lib::Distribution& _distribution;
+ const lib::ClusterState& _new_state;
+ const char* _storage_up_states;
+ const std::unordered_set<uint16_t>& _outdated_nodes; // TODO hash_set
+ const std::vector<dbtransition::Entry>& _entries;
+ uint32_t _iter;
+ public:
+ DbMerger(api::Timestamp creation_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries)
+ : _creation_timestamp(creation_timestamp),
+ _distribution(distribution),
+ _new_state(new_state),
+ _storage_up_states(storage_up_states),
+ _outdated_nodes(outdated_nodes),
+ _entries(entries),
+ _iter(0)
+ {}
+ ~DbMerger() override = default;
+
+ BucketDatabase::MergingProcessor::Result merge(BucketDatabase::Merger&) override;
+ void insert_remaining_at_end(BucketDatabase::TrailingInserter&) override;
+
+ /**
+ * Skips through all entries for the same bucket and returns
+ * the range in the entry list for which they were found.
+ * The range is [from, to>
+ */
+ Range skipAllForSameBucket();
+
+ std::vector<BucketCopy> getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range);
+ void insertInfo(BucketDatabase::Entry& info, const Range& range);
+ void addToMerger(BucketDatabase::Merger& merger, const Range& range);
+ void addToInserter(BucketDatabase::TrailingInserter& inserter, const Range& range);
+
+ // Returns whether at least one replica was removed from the entry.
+ // Does NOT implicitly update trusted status on remaining replicas; caller must do
+ // this explicitly.
+ bool removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId);
+
+ // Helper methods for iterating over _entries
+ bool databaseIteratorHasPassedBucketInfoIterator(uint64_t bucket_key) const;
+ bool bucketInfoIteratorPointsToBucket(uint64_t bucket_key) const;
+
+ bool nodeIsOutdated(uint16_t node) const {
+ return (_outdated_nodes.find(node) != _outdated_nodes.end());
+ }
+ };
+
PendingBucketSpaceDbTransition(const PendingClusterState &pendingClusterState,
+ document::BucketSpace bucket_space,
DistributorBucketSpace &distributorBucketSpace,
bool distributionChanged,
const OutdatedNodes &outdatedNodes,
std::shared_ptr<const ClusterInformation> clusterInfo,
const lib::ClusterState &newClusterState,
api::Timestamp creationTimestamp);
- ~PendingBucketSpaceDbTransition() override;
+ ~PendingBucketSpaceDbTransition();
// Merges all the results with the corresponding bucket database.
void mergeIntoBucketDatabase();
+ void merge_into_bucket_databases(StripeAccessGuard& guard);
// Adds the info from the reply to our list of information.
void onRequestBucketInfoReply(const api::RequestBucketInfoReply &reply, uint16_t node);
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index a7fd5a5af53..9e811ea9d23 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -84,9 +84,9 @@ PendingClusterState::initializeBucketSpaceTransitions(bool distributionChanged,
auto onItr = outdatedNodesMap.find(elem.first);
const auto &outdatedNodes = (onItr == outdatedNodesMap.end()) ? emptyOutdatedNodes : onItr->second;
auto pendingTransition =
- std::make_unique<PendingBucketSpaceDbTransition>
- (*this, *elem.second, distributionChanged, outdatedNodes,
- _clusterInfo, *_newClusterStateBundle.getDerivedClusterState(elem.first), _creationTimestamp);
+ std::make_unique<PendingBucketSpaceDbTransition>(
+ *this, elem.first, *elem.second, distributionChanged, outdatedNodes,
+ _clusterInfo, *_newClusterStateBundle.getDerivedClusterState(elem.first), _creationTimestamp);
if (pendingTransition->getBucketOwnershipTransfer()) {
_bucketOwnershipTransfer = true;
}
@@ -331,6 +331,14 @@ PendingClusterState::mergeIntoBucketDatabases()
}
void
+PendingClusterState::merge_into_bucket_databases(StripeAccessGuard& guard)
+{
+ for (auto &elem : _pendingTransitions) {
+ elem.second->merge_into_bucket_databases(guard);
+ }
+}
+
+void
PendingClusterState::printXml(vespalib::XmlOutputStream& xos) const
{
using namespace vespalib::xml;
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h
index 42b7bf0dcf2..af0c85fab95 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.h
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h
@@ -18,6 +18,7 @@ namespace storage::distributor {
class DistributorMessageSender;
class PendingBucketSpaceDbTransition;
class DistributorBucketSpaceRepo;
+class StripeAccessGuard;
/**
* Class used by BucketDBUpdater to track request bucket info
@@ -146,6 +147,8 @@ public:
* Merges all the results with the corresponding bucket databases.
*/
void mergeIntoBucketDatabases();
+ void merge_into_bucket_databases(StripeAccessGuard& guard);
+
// Get pending transition for a specific bucket space. Only used by unit test.
PendingBucketSpaceDbTransition &getPendingBucketSpaceDbTransition(document::BucketSpace bucketSpace);
diff --git a/storage/src/vespa/storage/distributor/potential_data_loss_report.h b/storage/src/vespa/storage/distributor/potential_data_loss_report.h
new file mode 100644
index 00000000000..96abd787649
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/potential_data_loss_report.h
@@ -0,0 +1,22 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <cstdint>
+
+namespace storage::distributor {
+
+/**
+ * Represents the amount of data a distributor reasons _may_ have become unavailable
+ * due to all bucket replicas no longer being present.
+ */
+struct PotentialDataLossReport {
+ size_t buckets = 0;
+ size_t documents = 0;
+
+ void merge(const PotentialDataLossReport& other) noexcept {
+ buckets += other.buckets;
+ documents += other.documents;
+ }
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/stripe_access_guard.h b/storage/src/vespa/storage/distributor/stripe_access_guard.h
new file mode 100644
index 00000000000..8d848450cd2
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/stripe_access_guard.h
@@ -0,0 +1,68 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "bucket_space_distribution_configs.h"
+#include "pending_bucket_space_db_transition_entry.h"
+#include "potential_data_loss_report.h"
+#include <vespa/document/bucket/bucketspace.h>
+#include <vespa/storageapi/defs.h>
+#include <unordered_set> // TODO use hash_set instead
+
+namespace storage::lib {
+class ClusterState;
+class ClusterStateBundle;
+class Distribution;
+}
+
+namespace storage::distributor {
+
+
+/**
+ * A stripe access guard guarantees that the holder of a guard can access underlying
+ * stripes via it in a thread safe manner. In particular, while any access guard is
+ * held, all stripe threads must be in a safe rendezvous location with no race conditions
+ * possible. When a guard goes out of scope, the stripe threads may resume operation.
+ */
+class StripeAccessGuard {
+public:
+ virtual ~StripeAccessGuard() = default;
+
+ virtual void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) = 0;
+ virtual void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) = 0;
+ virtual void clear_pending_cluster_state_bundle() = 0;
+ virtual void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) = 0;
+ virtual void notify_distribution_change_enabled() = 0;
+
+ virtual PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change) = 0;
+ virtual void merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries) = 0;
+
+ virtual void update_read_snapshot_before_db_pruning() = 0;
+ virtual void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) = 0;
+ virtual void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) = 0;
+ virtual void clear_read_only_bucket_repo_databases() = 0;
+
+};
+
+/**
+ * Provides a factory for guards that protect access to underlying stripes.
+ *
+ * Important: at most one StripeAccessorGuard may exist at any given time. Creating
+ * concurrent guards is undefined behavior.
+ */
+class StripeAccessor {
+public:
+ virtual ~StripeAccessor() = default;
+
+ virtual std::unique_ptr<StripeAccessGuard> rendezvous_and_hold_all() = 0;
+ // TODO also accessor for a single particular stripe?
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp
new file mode 100644
index 00000000000..cf1781e0ae8
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp
@@ -0,0 +1,1132 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "stripe_bucket_db_updater.h"
+#include "bucket_db_prune_elision.h"
+#include "bucket_space_distribution_context.h"
+#include "distributor.h"
+#include "distributor_bucket_space.h"
+#include "distributormetricsset.h"
+#include "pending_bucket_space_db_transition.h"
+#include "potential_data_loss_report.h"
+#include "simpleclusterinformation.h"
+#include "stripe_access_guard.h"
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/message/removelocation.h>
+#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vespalib/util/xmlstream.h>
+#include <thread>
+
+#include <vespa/log/bufferedlogger.h>
+LOG_SETUP(".distributor.stripe_bucket_db_updater");
+
+using storage::lib::Node;
+using storage::lib::NodeType;
+using document::BucketSpace;
+
+namespace storage::distributor {
+
+StripeBucketDBUpdater::StripeBucketDBUpdater(DistributorStripeInterface& owner,
+ DistributorBucketSpaceRepo& bucketSpaceRepo,
+ DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
+ DistributorMessageSender& sender,
+ DistributorComponentRegister& compReg)
+ : framework::StatusReporter("bucketdb", "Bucket DB Updater"),
+ _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"),
+ _node_ctx(_distributorComponent),
+ _op_ctx(_distributorComponent),
+ _distributor_interface(_distributorComponent.getDistributor()),
+ _delayedRequests(),
+ _sentMessages(),
+ _pendingClusterState(),
+ _history(),
+ _sender(sender),
+ _enqueuedRechecks(),
+ _outdatedNodesMap(),
+ _transitionTimer(_node_ctx.clock()),
+ _stale_reads_enabled(false),
+ _active_distribution_contexts(),
+ _explicit_transition_read_guard(),
+ _distribution_context_mutex()
+{
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
+ _active_distribution_contexts.emplace(
+ elem.first,
+ BucketSpaceDistributionContext::make_not_yet_initialized(_node_ctx.node_index()));
+ _explicit_transition_read_guard.emplace(elem.first, std::shared_ptr<BucketDatabase::ReadGuard>());
+ }
+}
+
+StripeBucketDBUpdater::~StripeBucketDBUpdater() = default;
+
+OperationRoutingSnapshot StripeBucketDBUpdater::read_snapshot_for_bucket(const document::Bucket& bucket) const {
+ const auto bucket_space = bucket.getBucketSpace();
+ std::lock_guard lock(_distribution_context_mutex);
+ auto active_state_iter = _active_distribution_contexts.find(bucket_space);
+ assert(active_state_iter != _active_distribution_contexts.cend());
+ auto& state = *active_state_iter->second;
+ if (!state.bucket_owned_in_active_state(bucket.getBucketId())) {
+ return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second);
+ }
+ const bool bucket_present_in_mutable_db = state.bucket_owned_in_pending_state(bucket.getBucketId());
+ if (!bucket_present_in_mutable_db && !stale_reads_enabled()) {
+ return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second);
+ }
+ const auto& space_repo = bucket_present_in_mutable_db
+ ? _op_ctx.bucket_space_repo()
+ : _op_ctx.read_only_bucket_space_repo();
+ auto existing_guard_iter = _explicit_transition_read_guard.find(bucket_space);
+ assert(existing_guard_iter != _explicit_transition_read_guard.cend());
+ auto db_guard = existing_guard_iter->second
+ ? existing_guard_iter-> second
+ : space_repo.get(bucket_space).getBucketDatabase().acquire_read_guard();
+ return OperationRoutingSnapshot::make_routable_with_guard(active_state_iter->second, std::move(db_guard), space_repo);
+}
+
+void
+StripeBucketDBUpdater::flush()
+{
+ for (auto & entry : _sentMessages) {
+ // Cannot sendDown MergeBucketReplies during flushing, since
+ // all lower links have been closed
+ if (entry.second._mergeReplyGuard) {
+ entry.second._mergeReplyGuard->resetReply();
+ }
+ }
+ _sentMessages.clear();
+}
+
+void
+StripeBucketDBUpdater::print(std::ostream& out, bool verbose, const std::string& indent) const
+{
+ (void) verbose; (void) indent;
+ out << "BucketDBUpdater";
+}
+
+bool
+StripeBucketDBUpdater::shouldDeferStateEnabling() const noexcept
+{
+ return stale_reads_enabled();
+}
+
+bool
+StripeBucketDBUpdater::hasPendingClusterState() const
+{
+ // Defer to the repo instead of checking our own internal pending cluster state,
+ // as we won't have one if the top level distributor handles this for all stripes.
+ // But if we're operating in "legacy" mode with this stripe bucket DB updater as
+ // the authoritative source, there should always be an internal pending cluster
+ // state if the repo is tagged as having one as well.
+ // Since we also set a pending cluster state bundle when triggered by a distribution
+ // config change, this check also covers that case.
+ return _op_ctx.bucket_space_repo().get(document::FixedBucketSpaces::default_space()).has_pending_cluster_state();
+}
+
+const lib::ClusterState*
+StripeBucketDBUpdater::pendingClusterStateOrNull(const document::BucketSpace& space) const {
+ auto& distr_space = _op_ctx.bucket_space_repo().get(space);
+ return (distr_space.has_pending_cluster_state()
+ ? &distr_space.get_pending_cluster_state()
+ : nullptr);
+}
+
+void
+StripeBucketDBUpdater::sendRequestBucketInfo(
+ uint16_t node,
+ const document::Bucket& bucket,
+ const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard)
+{
+ if (!_op_ctx.storage_node_is_up(bucket.getBucketSpace(), node)) {
+ return;
+ }
+
+ std::vector<document::BucketId> buckets;
+ buckets.push_back(bucket.getBucketId());
+
+ auto msg = std::make_shared<api::RequestBucketInfoCommand>(bucket.getBucketSpace(), buckets);
+
+ LOG(debug,
+ "Sending request bucket info command %" PRIu64 " for "
+ "bucket %s to node %u",
+ msg->getMsgId(),
+ bucket.toString().c_str(),
+ node);
+
+ msg->setPriority(50);
+ msg->setAddress(_node_ctx.node_address(node));
+
+ _sentMessages[msg->getMsgId()] =
+ BucketRequest(node, _op_ctx.generate_unique_timestamp(),
+ bucket, mergeReplyGuard);
+ _sender.sendCommand(msg);
+}
+
+void
+StripeBucketDBUpdater::recheckBucketInfo(uint32_t nodeIdx,
+ const document::Bucket& bucket)
+{
+ sendRequestBucketInfo(nodeIdx, bucket, std::shared_ptr<MergeReplyGuard>());
+}
+
+namespace {
+
+class ReadOnlyDbMergingInserter : public BucketDatabase::MergingProcessor {
+ using NewEntries = std::vector<BucketDatabase::Entry>;
+ NewEntries::const_iterator _current;
+ const NewEntries::const_iterator _last;
+public:
+ explicit ReadOnlyDbMergingInserter(const NewEntries& new_entries)
+ : _current(new_entries.cbegin()),
+ _last(new_entries.cend())
+ {}
+
+ Result merge(BucketDatabase::Merger& m) override {
+ const uint64_t key_to_insert = m.bucket_key();
+ uint64_t key_at_cursor = 0;
+ while (_current != _last) {
+ key_at_cursor = _current->getBucketId().toKey();
+ if (key_at_cursor >= key_to_insert) {
+ break;
+ }
+ m.insert_before_current(_current->getBucketId(), *_current);
+ ++_current;
+ }
+ if ((_current != _last) && (key_at_cursor == key_to_insert)) {
+ // If we encounter a bucket that already exists, replace value wholesale.
+ // Don't try to cleverly merge replicas, as the values we currently hold
+ // in the read-only DB may be stale.
+ // Note that this case shouldn't really happen, since we only add previously
+ // owned buckets to the read-only DB, and subsequent adds to a non-empty DB
+ // can only happen for state preemptions. Since ownership is not regained
+ // before a state is stable, a bucket is only added once. But we handle it
+ // anyway in case this changes at some point in the future.
+ m.current_entry() = *_current;
+ return Result::Update;
+ }
+ return Result::KeepUnchanged;
+ }
+
+ void insert_remaining_at_end(BucketDatabase::TrailingInserter& inserter) override {
+ for (; _current != _last; ++_current) {
+ inserter.insert_at_end(_current->getBucketId(), *_current);
+ }
+ }
+};
+
+}
+
+void
+StripeBucketDBUpdater::removeSuperfluousBuckets(
+ const lib::ClusterStateBundle& newState,
+ bool is_distribution_config_change)
+{
+ const bool move_to_read_only_db = shouldDeferStateEnabling();
+ const char* up_states = _op_ctx.storage_node_up_states();
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
+ const auto& newDistribution(elem.second->getDistribution());
+ const auto& oldClusterState(elem.second->getClusterState());
+ const auto& new_cluster_state = newState.getDerivedClusterState(elem.first);
+
+ // Running a full DB sweep is expensive, so if the cluster state transition does
+ // not actually indicate that buckets should possibly be removed, we elide it entirely.
+ if (!is_distribution_config_change
+ && db_pruning_may_be_elided(oldClusterState, *new_cluster_state, up_states))
+ {
+ LOG(debug, "[bucket space '%s']: eliding DB pruning for state transition '%s' -> '%s'",
+ document::FixedBucketSpaces::to_string(elem.first).data(),
+ oldClusterState.toString().c_str(), new_cluster_state->toString().c_str());
+ continue;
+ }
+
+ auto& bucketDb(elem.second->getBucketDatabase());
+ auto& readOnlyDb(_op_ctx.read_only_bucket_space_repo().get(elem.first).getBucketDatabase());
+
+ // Remove all buckets not belonging to this distributor, or
+ // being on storage nodes that are no longer up.
+ MergingNodeRemover proc(
+ oldClusterState,
+ *new_cluster_state,
+ _node_ctx.node_index(),
+ newDistribution,
+ up_states,
+ move_to_read_only_db);
+
+ bucketDb.merge(proc);
+ if (move_to_read_only_db) {
+ ReadOnlyDbMergingInserter read_only_merger(proc.getNonOwnedEntries());
+ readOnlyDb.merge(read_only_merger);
+ }
+ maybe_inject_simulated_db_pruning_delay();
+ }
+}
+
+PotentialDataLossReport
+StripeBucketDBUpdater::remove_superfluous_buckets(
+ document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change)
+{
+ (void)is_distribution_change; // TODO remove if not needed
+ const bool move_to_read_only_db = shouldDeferStateEnabling();
+ const char* up_states = _op_ctx.storage_node_up_states();
+
+ auto& s = _op_ctx.bucket_space_repo().get(bucket_space);
+ const auto& new_distribution = s.getDistribution();
+ const auto& old_cluster_state = s.getClusterState();
+ // Elision of DB sweep is done at a higher level, so we don't have to do that here.
+ auto& bucket_db = s.getBucketDatabase();
+ auto& read_only_db = _op_ctx.read_only_bucket_space_repo().get(bucket_space).getBucketDatabase();
+
+ // Remove all buckets not belonging to this distributor, or
+ // being on storage nodes that are no longer up.
+ MergingNodeRemover proc(
+ old_cluster_state,
+ new_state,
+ _node_ctx.node_index(),
+ new_distribution,
+ up_states,
+ move_to_read_only_db);
+
+ bucket_db.merge(proc);
+ if (move_to_read_only_db) {
+ ReadOnlyDbMergingInserter read_only_merger(proc.getNonOwnedEntries());
+ read_only_db.merge(read_only_merger);
+ }
+ PotentialDataLossReport report;
+ report.buckets = proc.removed_buckets();
+ report.documents = proc.removed_documents();
+ return report;
+}
+
+void
+StripeBucketDBUpdater::merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries)
+{
+ auto& s = _op_ctx.bucket_space_repo().get(bucket_space);
+ auto& bucket_db = s.getBucketDatabase();
+
+ PendingBucketSpaceDbTransition::DbMerger merger(gathered_at_timestamp, distribution, new_state,
+ storage_up_states, outdated_nodes, entries);
+ bucket_db.merge(merger);
+}
+
+namespace {
+
+void maybe_sleep_for(std::chrono::milliseconds ms) {
+ if (ms.count() > 0) {
+ std::this_thread::sleep_for(ms);
+ }
+}
+
+}
+
+void
+StripeBucketDBUpdater::maybe_inject_simulated_db_pruning_delay() {
+ maybe_sleep_for(_op_ctx.distributor_config().simulated_db_pruning_latency());
+}
+
+void
+StripeBucketDBUpdater::maybe_inject_simulated_db_merging_delay() {
+ maybe_sleep_for(_op_ctx.distributor_config().simulated_db_merging_latency());
+}
+
+void
+StripeBucketDBUpdater::ensureTransitionTimerStarted()
+{
+ // Don't overwrite start time if we're already processing a state, as
+ // that will make transition times appear artificially low.
+ if (!hasPendingClusterState()) {
+ _transitionTimer = framework::MilliSecTimer(
+ _node_ctx.clock());
+ }
+}
+
+void
+StripeBucketDBUpdater::completeTransitionTimer()
+{
+ _distributor_interface.getMetrics()
+ .stateTransitionTime.addValue(_transitionTimer.getElapsedTimeAsDouble());
+}
+
+void
+StripeBucketDBUpdater::clearReadOnlyBucketRepoDatabases()
+{
+ for (auto& space : _op_ctx.read_only_bucket_space_repo()) {
+ space.second->getBucketDatabase().clear();
+ }
+}
+
+void
+StripeBucketDBUpdater::storageDistributionChanged()
+{
+ ensureTransitionTimerStarted();
+
+ removeSuperfluousBuckets(_op_ctx.cluster_state_bundle(), true);
+
+ auto clusterInfo = std::make_shared<const SimpleClusterInformation>(
+ _node_ctx.node_index(),
+ _op_ctx.cluster_state_bundle(),
+ _op_ctx.storage_node_up_states());
+ _pendingClusterState = PendingClusterState::createForDistributionChange(
+ _node_ctx.clock(),
+ std::move(clusterInfo),
+ _sender,
+ _op_ctx.bucket_space_repo(),
+ _op_ctx.generate_unique_timestamp());
+ _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
+ _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
+}
+
+void
+StripeBucketDBUpdater::replyToPreviousPendingClusterStateIfAny()
+{
+ if (_pendingClusterState.get() && _pendingClusterState->hasCommand()) {
+ _distributor_interface.getMessageSender().sendUp(
+ std::make_shared<api::SetSystemStateReply>(*_pendingClusterState->getCommand()));
+ }
+}
+
+void
+StripeBucketDBUpdater::replyToActivationWithActualVersion(
+ const api::ActivateClusterStateVersionCommand& cmd,
+ uint32_t actualVersion)
+{
+ auto reply = std::make_shared<api::ActivateClusterStateVersionReply>(cmd);
+ reply->setActualVersion(actualVersion);
+ _distributor_interface.getMessageSender().sendUp(reply); // TODO let API accept rvalues
+}
+
+void StripeBucketDBUpdater::update_read_snapshot_before_db_pruning() {
+ std::lock_guard lock(_distribution_context_mutex);
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
+ // At this point, we're still operating with a distribution context _without_ a
+ // pending state, i.e. anyone using the context will expect to find buckets
+ // in the DB that correspond to how the database looked like prior to pruning
+ // buckets from the DB. To ensure this is not violated, take a snapshot of the
+ // _mutable_ DB and expose this. This snapshot only lives until we atomically
+ // flip to expose a distribution context that includes the new, pending state.
+ // At that point, the read-only DB is known to contain the buckets that have
+ // been pruned away, so we can release the mutable DB snapshot safely.
+ // TODO test for, and handle, state preemption case!
+ _explicit_transition_read_guard[elem.first] = elem.second->getBucketDatabase().acquire_read_guard();
+ }
+}
+
+
+void StripeBucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) {
+ std::lock_guard lock(_distribution_context_mutex);
+ const auto old_default_state = _op_ctx.bucket_space_repo().get(
+ document::FixedBucketSpaces::default_space()).cluster_state_sp();
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
+ auto new_distribution = elem.second->distribution_sp();
+ auto old_cluster_state = elem.second->cluster_state_sp();
+ auto new_cluster_state = new_state.getDerivedClusterState(elem.first);
+ _active_distribution_contexts.insert_or_assign(
+ elem.first,
+ BucketSpaceDistributionContext::make_state_transition(
+ std::move(old_cluster_state),
+ old_default_state,
+ std::move(new_cluster_state),
+ std::move(new_distribution),
+ _node_ctx.node_index()));
+ // We can now remove the explicit mutable DB snapshot, as the buckets that have been
+ // pruned away are visible in the read-only DB.
+ _explicit_transition_read_guard[elem.first] = std::shared_ptr<BucketDatabase::ReadGuard>();
+ }
+}
+
+void StripeBucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) {
+ std::lock_guard lock(_distribution_context_mutex);
+ const auto& default_cluster_state = activated_state.getDerivedClusterState(document::FixedBucketSpaces::default_space());
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
+ auto new_distribution = elem.second->distribution_sp();
+ auto new_cluster_state = activated_state.getDerivedClusterState(elem.first);
+ _active_distribution_contexts.insert_or_assign(
+ elem.first,
+ BucketSpaceDistributionContext::make_stable_state(
+ std::move(new_cluster_state),
+ default_cluster_state,
+ std::move(new_distribution),
+ _node_ctx.node_index()));
+ }
+}
+
+bool
+StripeBucketDBUpdater::onSetSystemState(
+ const std::shared_ptr<api::SetSystemStateCommand>& cmd)
+{
+ LOG(debug,
+ "Received new cluster state %s",
+ cmd->getSystemState().toString().c_str());
+
+ const lib::ClusterStateBundle oldState = _op_ctx.cluster_state_bundle();
+ const lib::ClusterStateBundle& state = cmd->getClusterStateBundle();
+
+ if (state == oldState) {
+ return false;
+ }
+ ensureTransitionTimerStarted();
+ // Separate timer since _transition_timer might span multiple pending states.
+ framework::MilliSecTimer process_timer(_node_ctx.clock());
+ update_read_snapshot_before_db_pruning();
+ const auto& bundle = cmd->getClusterStateBundle();
+ removeSuperfluousBuckets(bundle, false);
+ update_read_snapshot_after_db_pruning(bundle);
+ replyToPreviousPendingClusterStateIfAny();
+
+ auto clusterInfo = std::make_shared<const SimpleClusterInformation>(
+ _node_ctx.node_index(),
+ _op_ctx.cluster_state_bundle(),
+ _op_ctx.storage_node_up_states());
+ _pendingClusterState = PendingClusterState::createForClusterStateChange(
+ _node_ctx.clock(),
+ std::move(clusterInfo),
+ _sender,
+ _op_ctx.bucket_space_repo(),
+ cmd,
+ _outdatedNodesMap,
+ _op_ctx.generate_unique_timestamp());
+ _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
+
+ _distributor_interface.getMetrics().set_cluster_state_processing_time.addValue(
+ process_timer.getElapsedTimeAsDouble());
+
+ _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
+ if (isPendingClusterStateCompleted()) {
+ processCompletedPendingClusterState();
+ }
+ return true;
+}
+
+bool
+StripeBucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd)
+{
+ if (hasPendingClusterState() && _pendingClusterState->isVersionedTransition()) {
+ const auto pending_version = _pendingClusterState->clusterStateVersion();
+ if (pending_version == cmd->version()) {
+ if (isPendingClusterStateCompleted()) {
+ assert(_pendingClusterState->isDeferred());
+ activatePendingClusterState();
+ } else {
+ LOG(error, "Received cluster state activation for pending version %u "
+ "without pending state being complete yet. This is not expected, "
+ "as no activation should be sent before all distributors have "
+ "reported that state processing is complete.", pending_version);
+ replyToActivationWithActualVersion(*cmd, 0); // Invalid version, will cause re-send (hopefully when completed).
+ return true;
+ }
+ } else {
+ replyToActivationWithActualVersion(*cmd, pending_version);
+ return true;
+ }
+ } else if (shouldDeferStateEnabling()) {
+ // Likely just a resend, but log warn for now to get a feel of how common it is.
+ LOG(warning, "Received cluster state activation command for version %u, which "
+ "has no corresponding pending state. Likely resent operation.", cmd->version());
+ } else {
+ LOG(debug, "Received cluster state activation command for version %u, but distributor "
+ "config does not have deferred activation enabled. Treating as no-op.", cmd->version());
+ }
+ // Fall through to next link in call chain that cares about this message.
+ return false;
+}
+
+StripeBucketDBUpdater::MergeReplyGuard::~MergeReplyGuard()
+{
+ if (_reply) {
+ _distributor_interface.handleCompletedMerge(_reply);
+ }
+}
+
+bool
+StripeBucketDBUpdater::onMergeBucketReply(
+ const std::shared_ptr<api::MergeBucketReply>& reply)
+{
+ auto replyGuard = std::make_shared<MergeReplyGuard>(_distributor_interface, reply);
+
+ // In case the merge was unsuccessful somehow, or some nodes weren't
+ // actually merged (source-only nodes?) we request the bucket info of the
+ // bucket again to make sure it's ok.
+ for (uint32_t i = 0; i < reply->getNodes().size(); i++) {
+ sendRequestBucketInfo(reply->getNodes()[i].index,
+ reply->getBucket(),
+ replyGuard);
+ }
+
+ return true;
+}
+
+void
+StripeBucketDBUpdater::enqueueRecheckUntilPendingStateEnabled(
+ uint16_t node,
+ const document::Bucket& bucket)
+{
+ LOG(spam,
+ "DB updater has a pending cluster state, enqueuing recheck "
+ "of bucket %s on node %u until state is done processing",
+ bucket.toString().c_str(),
+ node);
+ _enqueuedRechecks.insert(EnqueuedBucketRecheck(node, bucket));
+}
+
+void
+StripeBucketDBUpdater::sendAllQueuedBucketRechecks()
+{
+ LOG(spam,
+ "Sending %zu queued bucket rechecks previously received "
+ "via NotifyBucketChange commands",
+ _enqueuedRechecks.size());
+
+ for (const auto & entry :_enqueuedRechecks) {
+ sendRequestBucketInfo(entry.node, entry.bucket, std::shared_ptr<MergeReplyGuard>());
+ }
+ _enqueuedRechecks.clear();
+}
+
+bool
+StripeBucketDBUpdater::onNotifyBucketChange(
+ const std::shared_ptr<api::NotifyBucketChangeCommand>& cmd)
+{
+ // Immediately schedule reply to ensure it is sent.
+ _sender.sendReply(std::make_shared<api::NotifyBucketChangeReply>(*cmd));
+
+ if (!cmd->getBucketInfo().valid()) {
+ LOG(error,
+ "Received invalid bucket info for bucket %s from notify bucket "
+ "change! Not updating bucket.",
+ cmd->getBucketId().toString().c_str());
+ return true;
+ }
+ LOG(debug,
+ "Received notify bucket change from node %u for bucket %s with %s.",
+ cmd->getSourceIndex(),
+ cmd->getBucketId().toString().c_str(),
+ cmd->getBucketInfo().toString().c_str());
+
+ if (hasPendingClusterState()) {
+ enqueueRecheckUntilPendingStateEnabled(cmd->getSourceIndex(),
+ cmd->getBucket());
+ } else {
+ sendRequestBucketInfo(cmd->getSourceIndex(),
+ cmd->getBucket(),
+ std::shared_ptr<MergeReplyGuard>());
+ }
+
+ return true;
+}
+
+namespace {
+
+bool sort_pred(const BucketListMerger::BucketEntry& left,
+ const BucketListMerger::BucketEntry& right) {
+ return left.first < right.first;
+}
+
+}
+
+bool
+StripeBucketDBUpdater::onRequestBucketInfoReply(
+ const std::shared_ptr<api::RequestBucketInfoReply> & repl)
+{
+ if (pendingClusterStateAccepted(repl)) {
+ return true;
+ }
+ return processSingleBucketInfoReply(repl);
+}
+
+bool
+StripeBucketDBUpdater::pendingClusterStateAccepted(
+ const std::shared_ptr<api::RequestBucketInfoReply> & repl)
+{
+ if (_pendingClusterState.get()
+ && _pendingClusterState->onRequestBucketInfoReply(repl))
+ {
+ if (isPendingClusterStateCompleted()) {
+ processCompletedPendingClusterState();
+ }
+ return true;
+ }
+ LOG(spam,
+ "Reply %s was not accepted by pending cluster state",
+ repl->toString().c_str());
+ return false;
+}
+
+void
+StripeBucketDBUpdater::handleSingleBucketInfoFailure(
+ const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ const BucketRequest& req)
+{
+ LOG(debug, "Request bucket info failed towards node %d: error was %s",
+ req.targetNode, repl->getResult().toString().c_str());
+
+ if (req.bucket.getBucketId() != document::BucketId(0)) {
+ framework::MilliSecTime sendTime(_node_ctx.clock());
+ sendTime += framework::MilliSecTime(100);
+ _delayedRequests.emplace_back(sendTime, req);
+ }
+}
+
+void
+StripeBucketDBUpdater::resendDelayedMessages()
+{
+ if (_pendingClusterState) {
+ _pendingClusterState->resendDelayedMessages();
+ }
+ if (_delayedRequests.empty()) {
+ return; // Don't fetch time if not needed
+ }
+ framework::MilliSecTime currentTime(_node_ctx.clock());
+ while (!_delayedRequests.empty()
+ && currentTime >= _delayedRequests.front().first)
+ {
+ BucketRequest& req(_delayedRequests.front().second);
+ sendRequestBucketInfo(req.targetNode, req.bucket, std::shared_ptr<MergeReplyGuard>());
+ _delayedRequests.pop_front();
+ }
+}
+
+void
+StripeBucketDBUpdater::convertBucketInfoToBucketList(
+ const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ uint16_t targetNode, BucketListMerger::BucketList& newList)
+{
+ for (const auto & entry : repl->getBucketInfo()) {
+ LOG(debug, "Received bucket information from node %u for bucket %s: %s", targetNode,
+ entry._bucketId.toString().c_str(), entry._info.toString().c_str());
+
+ newList.emplace_back(entry._bucketId, entry._info);
+ }
+}
+
+void
+StripeBucketDBUpdater::mergeBucketInfoWithDatabase(
+ const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ const BucketRequest& req)
+{
+ BucketListMerger::BucketList existing;
+ BucketListMerger::BucketList newList;
+
+ findRelatedBucketsInDatabase(req.targetNode, req.bucket, existing);
+ convertBucketInfoToBucketList(repl, req.targetNode, newList);
+
+ std::sort(existing.begin(), existing.end(), sort_pred);
+ std::sort(newList.begin(), newList.end(), sort_pred);
+
+ BucketListMerger merger(newList, existing, req.timestamp);
+ updateDatabase(req.bucket.getBucketSpace(), req.targetNode, merger);
+}
+
+bool
+StripeBucketDBUpdater::processSingleBucketInfoReply(
+ const std::shared_ptr<api::RequestBucketInfoReply> & repl)
+{
+ auto iter = _sentMessages.find(repl->getMsgId());
+
+ // Has probably been deleted for some reason earlier.
+ if (iter == _sentMessages.end()) {
+ return true;
+ }
+
+ BucketRequest req = iter->second;
+ _sentMessages.erase(iter);
+
+ if (!_op_ctx.storage_node_is_up(req.bucket.getBucketSpace(), req.targetNode)) {
+ // Ignore replies from nodes that are down.
+ return true;
+ }
+ if (repl->getResult().getResult() != api::ReturnCode::OK) {
+ handleSingleBucketInfoFailure(repl, req);
+ return true;
+ }
+ mergeBucketInfoWithDatabase(repl, req);
+ return true;
+}
+
+void
+StripeBucketDBUpdater::addBucketInfoForNode(
+ const BucketDatabase::Entry& e,
+ uint16_t node,
+ BucketListMerger::BucketList& existing) const
+{
+ const BucketCopy* copy(e->getNode(node));
+ if (copy) {
+ existing.emplace_back(e.getBucketId(), copy->getBucketInfo());
+ }
+}
+
+void
+StripeBucketDBUpdater::findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket,
+ BucketListMerger::BucketList& existing)
+{
+ auto &distributorBucketSpace(_op_ctx.bucket_space_repo().get(bucket.getBucketSpace()));
+ std::vector<BucketDatabase::Entry> entries;
+ distributorBucketSpace.getBucketDatabase().getAll(bucket.getBucketId(), entries);
+
+ for (const BucketDatabase::Entry & entry : entries) {
+ addBucketInfoForNode(entry, node, existing);
+ }
+}
+
+void
+StripeBucketDBUpdater::updateDatabase(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger)
+{
+ for (const document::BucketId & bucketId : merger.getRemovedEntries()) {
+ document::Bucket bucket(bucketSpace, bucketId);
+ _op_ctx.remove_node_from_bucket_database(bucket, node);
+ }
+
+ for (const BucketListMerger::BucketEntry& entry : merger.getAddedEntries()) {
+ document::Bucket bucket(bucketSpace, entry.first);
+ _op_ctx.update_bucket_database(
+ bucket,
+ BucketCopy(merger.getTimestamp(), node, entry.second),
+ DatabaseUpdate::CREATE_IF_NONEXISTING);
+ }
+}
+
+bool
+StripeBucketDBUpdater::isPendingClusterStateCompleted() const
+{
+ return _pendingClusterState.get() && _pendingClusterState->done();
+}
+
+void
+StripeBucketDBUpdater::processCompletedPendingClusterState()
+{
+ if (_pendingClusterState->isDeferred()) {
+ LOG(debug, "Deferring completion of pending cluster state version %u until explicitly activated",
+ _pendingClusterState->clusterStateVersion());
+ assert(_pendingClusterState->hasCommand()); // Deferred transitions should only ever be created by state commands.
+ // Sending down SetSystemState command will reach the state manager and a reply
+ // will be auto-sent back to the cluster controller in charge. Once this happens,
+ // it will send an explicit activation command once all distributors have reported
+ // that their pending cluster states have completed.
+ // A booting distributor will treat itself as "system Up" before the state has actually
+ // taken effect via activation. External operation handler will keep operations from
+ // actually being scheduled until state has been activated. The external operation handler
+ // needs to be explicitly aware of the case where no state has yet to be activated.
+ _distributor_interface.getMessageSender().sendDown(
+ _pendingClusterState->getCommand());
+ _pendingClusterState->clearCommand();
+ return;
+ }
+ // Distribution config change or non-deferred cluster state. Immediately activate
+ // the pending state without being told to do so explicitly.
+ activatePendingClusterState();
+}
+
+void
+StripeBucketDBUpdater::activatePendingClusterState()
+{
+ framework::MilliSecTimer process_timer(_node_ctx.clock());
+
+ _pendingClusterState->mergeIntoBucketDatabases();
+ maybe_inject_simulated_db_merging_delay();
+
+ if (_pendingClusterState->isVersionedTransition()) {
+ LOG(debug, "Activating pending cluster state version %u", _pendingClusterState->clusterStateVersion());
+ enableCurrentClusterStateBundleInDistributor();
+ if (_pendingClusterState->hasCommand()) {
+ _distributor_interface.getMessageSender().sendDown(
+ _pendingClusterState->getCommand());
+ }
+ addCurrentStateToClusterStateHistory();
+ } else {
+ LOG(debug, "Activating pending distribution config");
+ // TODO distribution changes cannot currently be deferred as they are not
+ // initiated by the cluster controller!
+ _distributor_interface.notifyDistributionChangeEnabled();
+ }
+
+ update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle());
+ _pendingClusterState.reset();
+ _outdatedNodesMap.clear();
+ _op_ctx.bucket_space_repo().clear_pending_cluster_state_bundle(); // TODO also read only bucket space..?
+ sendAllQueuedBucketRechecks();
+ completeTransitionTimer();
+ clearReadOnlyBucketRepoDatabases();
+
+ _distributor_interface.getMetrics().activate_cluster_state_processing_time.addValue(
+ process_timer.getElapsedTimeAsDouble());
+}
+
+void
+StripeBucketDBUpdater::enableCurrentClusterStateBundleInDistributor()
+{
+ const lib::ClusterStateBundle& state(
+ _pendingClusterState->getNewClusterStateBundle());
+
+ LOG(debug,
+ "BucketDBUpdater finished processing state %s",
+ state.getBaselineClusterState()->toString().c_str());
+
+ _distributor_interface.enableClusterStateBundle(state);
+}
+
+void StripeBucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state) {
+ update_read_snapshot_after_activation(activated_state);
+ _distributor_interface.enableClusterStateBundle(activated_state);
+}
+
+void
+StripeBucketDBUpdater::addCurrentStateToClusterStateHistory()
+{
+ _history.push_back(_pendingClusterState->getSummary());
+
+ if (_history.size() > 50) {
+ _history.pop_front();
+ }
+}
+
+vespalib::string
+StripeBucketDBUpdater::getReportContentType(const framework::HttpUrlPath&) const
+{
+ return "text/xml";
+}
+
+namespace {
+
+const vespalib::string ALL = "all";
+const vespalib::string BUCKETDB = "bucketdb";
+const vespalib::string BUCKETDB_UPDATER = "Bucket Database Updater";
+
+}
+
+void
+StripeBucketDBUpdater::BucketRequest::print_xml_tag(vespalib::xml::XmlOutputStream &xos, const vespalib::xml::XmlAttribute &timestampAttribute) const
+{
+ using namespace vespalib::xml;
+ xos << XmlTag("storagenode")
+ << XmlAttribute("index", targetNode);
+ xos << XmlAttribute("bucketspace", bucket.getBucketSpace().getId(), XmlAttribute::HEX);
+ if (bucket.getBucketId().getRawId() == 0) {
+ xos << XmlAttribute("bucket", ALL);
+ } else {
+ xos << XmlAttribute("bucket", bucket.getBucketId().getId(), XmlAttribute::HEX);
+ }
+ xos << timestampAttribute << XmlEndTag();
+}
+
+bool
+StripeBucketDBUpdater::reportStatus(std::ostream& out,
+ const framework::HttpUrlPath& path) const
+{
+ using namespace vespalib::xml;
+ XmlOutputStream xos(out);
+ // FIXME(vekterli): have to do this manually since we cannot inherit
+ // directly from XmlStatusReporter due to data races when BucketDBUpdater
+ // gets status requests directly.
+ xos << XmlTag("status")
+ << XmlAttribute("id", BUCKETDB)
+ << XmlAttribute("name", BUCKETDB_UPDATER);
+ reportXmlStatus(xos, path);
+ xos << XmlEndTag();
+ return true;
+}
+
+vespalib::string
+StripeBucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos,
+ const framework::HttpUrlPath&) const
+{
+ using namespace vespalib::xml;
+ xos << XmlTag("bucketdb")
+ << XmlTag("systemstate_active")
+ << XmlContent(_op_ctx.cluster_state_bundle().getBaselineClusterState()->toString())
+ << XmlEndTag();
+ if (_pendingClusterState) {
+ xos << *_pendingClusterState;
+ }
+ xos << XmlTag("systemstate_history");
+ for (auto i(_history.rbegin()), e(_history.rend()); i != e; ++i) {
+ xos << XmlTag("change")
+ << XmlAttribute("from", i->_prevClusterState)
+ << XmlAttribute("to", i->_newClusterState)
+ << XmlAttribute("processingtime", i->_processingTime)
+ << XmlEndTag();
+ }
+ xos << XmlEndTag()
+ << XmlTag("single_bucket_requests");
+ for (const auto & entry : _sentMessages)
+ {
+ entry.second.print_xml_tag(xos, XmlAttribute("sendtimestamp", entry.second.timestamp));
+ }
+ xos << XmlEndTag()
+ << XmlTag("delayed_single_bucket_requests");
+ for (const auto & entry : _delayedRequests)
+ {
+ entry.second.print_xml_tag(xos, XmlAttribute("resendtimestamp", entry.first.getTime()));
+ }
+ xos << XmlEndTag() << XmlEndTag();
+ return "";
+}
+
+StripeBucketDBUpdater::MergingNodeRemover::MergingNodeRemover(
+ const lib::ClusterState& oldState,
+ const lib::ClusterState& s,
+ uint16_t localIndex,
+ const lib::Distribution& distribution,
+ const char* upStates,
+ bool track_non_owned_entries)
+ : _oldState(oldState),
+ _state(s),
+ _available_nodes(),
+ _nonOwnedBuckets(),
+ _removed_buckets(0),
+ _removed_documents(0),
+ _localIndex(localIndex),
+ _distribution(distribution),
+ _upStates(upStates),
+ _track_non_owned_entries(track_non_owned_entries),
+ _cachedDecisionSuperbucket(UINT64_MAX),
+ _cachedOwned(false)
+{
+ // TODO intersection of cluster state and distribution config
+ const uint16_t storage_count = s.getNodeCount(lib::NodeType::STORAGE);
+ _available_nodes.resize(storage_count);
+ for (uint16_t i = 0; i < storage_count; ++i) {
+ if (s.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState().oneOf(_upStates)) {
+ _available_nodes[i] = true;
+ }
+ }
+}
+
+void
+StripeBucketDBUpdater::MergingNodeRemover::logRemove(const document::BucketId& bucketId, const char* msg) const
+{
+ LOG(spam, "Removing bucket %s: %s", bucketId.toString().c_str(), msg);
+}
+
+namespace {
+
+uint64_t superbucket_from_id(const document::BucketId& id, uint16_t distribution_bits) noexcept {
+ // The n LSBs of the bucket ID contain the superbucket number. Mask off the rest.
+ return id.getRawId() & ~(UINT64_MAX << distribution_bits);
+}
+
+}
+
+bool
+StripeBucketDBUpdater::MergingNodeRemover::distributorOwnsBucket(
+ const document::BucketId& bucketId) const
+{
+ // TODO "no distributors available" case is the same for _all_ buckets; cache once in constructor.
+ // TODO "too few bits used" case can be cheaply checked without needing exception
+ try {
+ const auto bits = _state.getDistributionBitCount();
+ const auto this_superbucket = superbucket_from_id(bucketId, bits);
+ if (_cachedDecisionSuperbucket == this_superbucket) {
+ if (!_cachedOwned) {
+ logRemove(bucketId, "bucket now owned by another distributor (cached)");
+ }
+ return _cachedOwned;
+ }
+
+ uint16_t distributor = _distribution.getIdealDistributorNode(_state, bucketId, "uim");
+ _cachedDecisionSuperbucket = this_superbucket;
+ _cachedOwned = (distributor == _localIndex);
+ if (!_cachedOwned) {
+ logRemove(bucketId, "bucket now owned by another distributor");
+ return false;
+ }
+ return true;
+ } catch (lib::TooFewBucketBitsInUseException& exc) {
+ logRemove(bucketId, "using too few distribution bits now");
+ } catch (lib::NoDistributorsAvailableException& exc) {
+ logRemove(bucketId, "no distributors are available");
+ }
+ return false;
+}
+
+void
+StripeBucketDBUpdater::MergingNodeRemover::setCopiesInEntry(
+ BucketDatabase::Entry& e,
+ const std::vector<BucketCopy>& copies) const
+{
+ e->clear();
+
+ std::vector<uint16_t> order =
+ _distribution.getIdealStorageNodes(_state, e.getBucketId(), _upStates);
+
+ e->addNodes(copies, order);
+
+ LOG(spam, "Changed %s", e->toString().c_str());
+}
+
+bool
+StripeBucketDBUpdater::MergingNodeRemover::has_unavailable_nodes(const storage::BucketDatabase::Entry& e) const
+{
+ const uint16_t n_nodes = e->getNodeCount();
+ for (uint16_t i = 0; i < n_nodes; i++) {
+ const uint16_t node_idx = e->getNodeRef(i).getNode();
+ if (!storage_node_is_available(node_idx)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+BucketDatabase::MergingProcessor::Result
+StripeBucketDBUpdater::MergingNodeRemover::merge(storage::BucketDatabase::Merger& merger)
+{
+ document::BucketId bucketId(merger.bucket_id());
+ LOG(spam, "Check for remove: bucket %s", bucketId.toString().c_str());
+ if (!distributorOwnsBucket(bucketId)) {
+ // TODO remove in favor of DB snapshotting
+ if (_track_non_owned_entries) {
+ _nonOwnedBuckets.emplace_back(merger.current_entry());
+ }
+ return Result::Skip;
+ }
+ auto& e = merger.current_entry();
+
+ if (e->getNodeCount() == 0) { // TODO when should this edge ever trigger?
+ return Result::Skip;
+ }
+
+ if (!has_unavailable_nodes(e)) {
+ return Result::KeepUnchanged;
+ }
+
+ std::vector<BucketCopy> remainingCopies;
+ for (uint16_t i = 0; i < e->getNodeCount(); i++) {
+ const uint16_t node_idx = e->getNodeRef(i).getNode();
+ if (storage_node_is_available(node_idx)) {
+ remainingCopies.push_back(e->getNodeRef(i));
+ }
+ }
+
+ if (remainingCopies.empty()) {
+ ++_removed_buckets;
+ _removed_documents += e->getHighestDocumentCount();
+ return Result::Skip;
+ } else {
+ setCopiesInEntry(e, remainingCopies);
+ return Result::Update;
+ }
+}
+
+bool
+StripeBucketDBUpdater::MergingNodeRemover::storage_node_is_available(uint16_t index) const noexcept
+{
+ return ((index < _available_nodes.size()) && _available_nodes[index]);
+}
+
+StripeBucketDBUpdater::MergingNodeRemover::~MergingNodeRemover()
+{
+ if (_removed_buckets != 0) {
+ LOGBM(info, "After cluster state change %s, %zu buckets no longer "
+ "have available replicas. %zu documents in these buckets will "
+ "be unavailable until nodes come back up",
+ _oldState.getTextualDifference(_state).c_str(),
+ _removed_buckets, _removed_documents);
+ }
+}
+
+} // distributor
diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
new file mode 100644
index 00000000000..e34d28a69bc
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
@@ -0,0 +1,284 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "bucketlistmerger.h"
+#include "distributor_stripe_component.h"
+#include "distributormessagesender.h"
+#include "messageguard.h"
+#include "operation_routing_snapshot.h"
+#include "outdated_nodes_map.h"
+#include "pendingclusterstate.h"
+#include "potential_data_loss_report.h"
+#include <vespa/document/bucket/bucket.h>
+#include <vespa/storage/common/storagelink.h>
+#include <vespa/storageapi/message/bucket.h>
+#include <vespa/storageapi/messageapi/messagehandler.h>
+#include <vespa/storageframework/generic/clock/timer.h>
+#include <vespa/storageframework/generic/status/statusreporter.h>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <atomic>
+#include <list>
+#include <mutex>
+
+namespace vespalib::xml {
+class XmlOutputStream;
+class XmlAttribute;
+}
+
+namespace storage::distributor {
+
+class DistributorStripeInterface;
+class BucketSpaceDistributionContext;
+
+class StripeBucketDBUpdater final
+ : public framework::StatusReporter,
+ public api::MessageHandler
+{
+public:
+ using OutdatedNodesMap = dbtransition::OutdatedNodesMap;
+ StripeBucketDBUpdater(DistributorStripeInterface& owner,
+ DistributorBucketSpaceRepo& bucketSpaceRepo,
+ DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
+ DistributorMessageSender& sender,
+ DistributorComponentRegister& compReg);
+ ~StripeBucketDBUpdater() override;
+
+ void flush();
+ const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const;
+ void recheckBucketInfo(uint32_t nodeIdx, const document::Bucket& bucket);
+
+ bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override;
+ bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) override;
+ bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply> & repl) override;
+ bool onMergeBucketReply(const std::shared_ptr<api::MergeBucketReply>& reply) override;
+ bool onNotifyBucketChange(const std::shared_ptr<api::NotifyBucketChangeCommand>&) override;
+ void resendDelayedMessages();
+ void storageDistributionChanged();
+
+ vespalib::string reportXmlStatus(vespalib::xml::XmlOutputStream&, const framework::HttpUrlPath&) const;
+ vespalib::string getReportContentType(const framework::HttpUrlPath&) const override;
+ bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override;
+ void print(std::ostream& out, bool verbose, const std::string& indent) const;
+ const DistributorNodeContext& node_context() const { return _node_ctx; }
+ DistributorOperationContext& operation_context() { return _op_ctx; }
+
+ /**
+ * Returns whether the current PendingClusterState indicates that there has
+ * been a transfer of bucket ownership amongst the distributors in the
+ * cluster. This method only makes sense to call when _pending_cluster_state
+ * is active, such as from within a enableClusterState() call.
+ */
+ bool bucketOwnershipHasChanged() const {
+ return ((_pendingClusterState.get() != nullptr)
+ && _pendingClusterState->hasBucketOwnershipTransfer());
+ }
+ void set_stale_reads_enabled(bool enabled) noexcept {
+ _stale_reads_enabled.store(enabled, std::memory_order_relaxed);
+ }
+ bool stale_reads_enabled() const noexcept {
+ return _stale_reads_enabled.load(std::memory_order_relaxed);
+ }
+
+ OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const;
+private:
+ class MergeReplyGuard {
+ public:
+ MergeReplyGuard(DistributorStripeInterface& distributor_interface, const std::shared_ptr<api::MergeBucketReply>& reply) noexcept
+ : _distributor_interface(distributor_interface), _reply(reply) {}
+
+ ~MergeReplyGuard();
+
+ // Used when we're flushing and simply want to drop the reply rather
+ // than send it down
+ void resetReply() { _reply.reset(); }
+ private:
+ DistributorStripeInterface& _distributor_interface;
+ std::shared_ptr<api::MergeBucketReply> _reply;
+ };
+
+ struct BucketRequest {
+ BucketRequest()
+ : targetNode(0), bucket(), timestamp(0) {};
+
+ BucketRequest(uint16_t t, uint64_t currentTime, const document::Bucket& b,
+ const std::shared_ptr<MergeReplyGuard>& guard)
+ : targetNode(t),
+ bucket(b),
+ timestamp(currentTime),
+ _mergeReplyGuard(guard) {};
+
+ void print_xml_tag(vespalib::xml::XmlOutputStream &xos, const vespalib::xml::XmlAttribute &timestampAttribute) const;
+ uint16_t targetNode;
+ document::Bucket bucket;
+ uint64_t timestamp;
+
+ std::shared_ptr<MergeReplyGuard> _mergeReplyGuard;
+ };
+
+ struct EnqueuedBucketRecheck {
+ uint16_t node;
+ document::Bucket bucket;
+
+ EnqueuedBucketRecheck() : node(0), bucket() {}
+
+ EnqueuedBucketRecheck(uint16_t _node, const document::Bucket& _bucket)
+ : node(_node),
+ bucket(_bucket)
+ {}
+
+ bool operator<(const EnqueuedBucketRecheck& o) const {
+ if (node != o.node) {
+ return node < o.node;
+ }
+ return bucket < o.bucket;
+ }
+ bool operator==(const EnqueuedBucketRecheck& o) const {
+ return node == o.node && bucket == o.bucket;
+ }
+ };
+
+ friend class DistributorTestUtil;
+ // TODO refactor and rewire to avoid needing this direct meddling
+ friend class LegacySingleStripeAccessGuard;
+ // Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor
+ // components agree on the currently active cluster state bundle.
+ // Transitively invokes Distributor::enableClusterStateBundle
+ void simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state);
+
+ bool shouldDeferStateEnabling() const noexcept;
+ bool hasPendingClusterState() const;
+ bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
+ bool processSingleBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
+ void handleSingleBucketInfoFailure(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ const BucketRequest& req);
+ bool isPendingClusterStateCompleted() const;
+ void processCompletedPendingClusterState();
+ void activatePendingClusterState();
+ void mergeBucketInfoWithDatabase(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ const BucketRequest& req);
+ void convertBucketInfoToBucketList(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ uint16_t targetNode, BucketListMerger::BucketList& newList);
+ void sendRequestBucketInfo(uint16_t node, const document::Bucket& bucket,
+ const std::shared_ptr<MergeReplyGuard>& mergeReply);
+ void addBucketInfoForNode(const BucketDatabase::Entry& e, uint16_t node,
+ BucketListMerger::BucketList& existing) const;
+ void ensureTransitionTimerStarted();
+ void completeTransitionTimer();
+ void clearReadOnlyBucketRepoDatabases();
+ /**
+ * Adds all buckets contained in the bucket database
+ * that are either contained
+ * in bucketId, or that bucketId is contained in, that have copies
+ * on the given node.
+ */
+ void findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket,
+ BucketListMerger::BucketList& existing);
+
+ /**
+ Updates the bucket database from the information generated by the given
+ bucket list merger.
+ */
+ void updateDatabase(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger);
+
+ void updateState(const lib::ClusterState& oldState, const lib::ClusterState& newState);
+
+ void update_read_snapshot_before_db_pruning();
+ void removeSuperfluousBuckets(const lib::ClusterStateBundle& newState,
+ bool is_distribution_config_change);
+ void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state);
+ void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state);
+
+ // TODO STRIPE only called when stripe guard is held
+ PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change);
+ void merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries);
+
+ void replyToPreviousPendingClusterStateIfAny();
+ void replyToActivationWithActualVersion(
+ const api::ActivateClusterStateVersionCommand& cmd,
+ uint32_t actualVersion);
+
+ void enableCurrentClusterStateBundleInDistributor();
+ void addCurrentStateToClusterStateHistory();
+ void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::Bucket&);
+ void sendAllQueuedBucketRechecks();
+
+ void maybe_inject_simulated_db_pruning_delay();
+ void maybe_inject_simulated_db_merging_delay();
+
+ /**
+ Removes all copies of buckets that are on nodes that are down.
+ */
+ class MergingNodeRemover : public BucketDatabase::MergingProcessor {
+ public:
+ MergingNodeRemover(const lib::ClusterState& oldState,
+ const lib::ClusterState& s,
+ uint16_t localIndex,
+ const lib::Distribution& distribution,
+ const char* upStates,
+ bool track_non_owned_entries);
+ ~MergingNodeRemover() override;
+
+ Result merge(BucketDatabase::Merger&) override;
+ void logRemove(const document::BucketId& bucketId, const char* msg) const;
+ bool distributorOwnsBucket(const document::BucketId&) const;
+
+ const std::vector<BucketDatabase::Entry>& getNonOwnedEntries() const noexcept {
+ return _nonOwnedBuckets;
+ }
+ size_t removed_buckets() const noexcept { return _removed_buckets; }
+ size_t removed_documents() const noexcept { return _removed_documents; }
+ private:
+ void setCopiesInEntry(BucketDatabase::Entry& e, const std::vector<BucketCopy>& copies) const;
+
+ bool has_unavailable_nodes(const BucketDatabase::Entry&) const;
+ bool storage_node_is_available(uint16_t index) const noexcept;
+
+ const lib::ClusterState _oldState;
+ const lib::ClusterState _state;
+ std::vector<bool> _available_nodes;
+ std::vector<BucketDatabase::Entry> _nonOwnedBuckets;
+ size_t _removed_buckets;
+ size_t _removed_documents;
+
+ uint16_t _localIndex;
+ const lib::Distribution& _distribution;
+ const char* _upStates;
+ bool _track_non_owned_entries;
+
+ mutable uint64_t _cachedDecisionSuperbucket;
+ mutable bool _cachedOwned;
+ };
+
+ DistributorStripeComponent _distributorComponent;
+ const DistributorNodeContext& _node_ctx;
+ DistributorOperationContext& _op_ctx;
+ DistributorStripeInterface& _distributor_interface;
+ std::deque<std::pair<framework::MilliSecTime, BucketRequest> > _delayedRequests;
+ std::map<uint64_t, BucketRequest> _sentMessages;
+ std::unique_ptr<PendingClusterState> _pendingClusterState;
+ std::list<PendingClusterState::Summary> _history;
+ DistributorMessageSender& _sender;
+ std::set<EnqueuedBucketRecheck> _enqueuedRechecks;
+ OutdatedNodesMap _outdatedNodesMap;
+ framework::MilliSecTimer _transitionTimer;
+ std::atomic<bool> _stale_reads_enabled;
+ using DistributionContexts = std::unordered_map<document::BucketSpace,
+ std::shared_ptr<BucketSpaceDistributionContext>,
+ document::BucketSpace::hash>;
+ DistributionContexts _active_distribution_contexts;
+ using DbGuards = std::unordered_map<document::BucketSpace,
+ std::shared_ptr<BucketDatabase::ReadGuard>,
+ document::BucketSpace::hash>;
+ DbGuards _explicit_transition_read_guard;
+ mutable std::mutex _distribution_context_mutex;
+};
+
+}
diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp
index fbbc7c366fd..367a325939e 100644
--- a/storage/src/vespa/storage/storageserver/distributornode.cpp
+++ b/storage/src/vespa/storage/storageserver/distributornode.cpp
@@ -107,6 +107,7 @@ DistributorNode::createChain(IStorageChainBuilder &builder)
builder.add(std::move(stateManager));
}
+// FIXME STRIPE not thread safe!!
api::Timestamp
DistributorNode::getUniqueTimestamp()
{