summaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp2
-rw-r--r--storage/src/tests/distributor/distributortest.cpp39
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp4
-rw-r--r--storage/src/tests/distributor/distributortestutil.h4
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp3
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h4
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt3
-rw-r--r--storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp17
-rw-r--r--storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h27
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp768
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h172
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp84
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h13
-rw-r--r--storage/src/vespa/storage/distributor/distributor_bucket_space.h1
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp17
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h17
-rw-r--r--storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp88
-rw-r--r--storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h67
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp68
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h109
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp14
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.h3
-rw-r--r--storage/src/vespa/storage/distributor/potential_data_loss_report.h22
-rw-r--r--storage/src/vespa/storage/distributor/stripe_access_guard.h68
-rw-r--r--storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp1132
-rw-r--r--storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h284
-rw-r--r--storage/src/vespa/storage/storageserver/distributornode.cpp1
27 files changed, 2281 insertions, 750 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 4ec49b5c6f8..97fccf58901 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -1116,7 +1116,7 @@ TEST_F(BucketDBUpdaterTest, notify_bucket_change_from_node_down) {
getBucketDBUpdater().onNotifyBucketChange(cmd);
}
// Enable here to avoid having request bucket info be silently swallowed
- // (sendRequestBucketInfo drops message if node is down).
+ // (send_request_bucket_info drops message if node is down).
enableDistributorClusterState("distributor:1 storage:2 .0.s:d");
ASSERT_EQ(std::string("BucketId(0x4000000000000001) : "
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 4c574609df5..7958306db5f 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -239,6 +239,7 @@ DistributorTest::DistributorTest()
DistributorTest::~DistributorTest() = default;
+// TODO -> stripe test
TEST_F(DistributorTest, operation_generation) {
setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
@@ -257,6 +258,7 @@ TEST_F(DistributorTest, operation_generation) {
EXPECT_EQ("Visitor Create", testOp(cmd));
}
+// TODO -> stripe test
TEST_F(DistributorTest, operations_generated_and_started_without_duplicates) {
setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
@@ -271,6 +273,8 @@ TEST_F(DistributorTest, operations_generated_and_started_without_duplicates) {
ASSERT_EQ(6, _sender.commands().size());
}
+// TODO -> stripe test
+// TODO also need to impl/test cross-stripe cluster state changes
TEST_F(DistributorTest, recovery_mode_on_cluster_state_change) {
setupDistributor(Redundancy(1), NodeCount(2),
"storage:1 .0.s:d distributor:1");
@@ -291,6 +295,8 @@ TEST_F(DistributorTest, recovery_mode_on_cluster_state_change) {
EXPECT_TRUE(_distributor->isInRecoveryMode());
}
+// TODO -> stripe test
+// TODO how to throttle across stripes?
TEST_F(DistributorTest, operations_are_throttled) {
setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
getConfig().setMinPendingMaintenanceOps(1);
@@ -303,6 +309,7 @@ TEST_F(DistributorTest, operations_are_throttled) {
ASSERT_EQ(1, _sender.commands().size());
}
+// TODO -> stripe test
TEST_F(DistributorTest, handle_unknown_maintenance_reply) {
setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
@@ -322,6 +329,7 @@ TEST_F(DistributorTest, handle_unknown_maintenance_reply) {
}
}
+// TODO -> generic, non distr/stripe test
TEST_F(DistributorTest, contains_time_statement) {
setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
@@ -333,6 +341,7 @@ TEST_F(DistributorTest, contains_time_statement) {
EXPECT_TRUE(getConfig().containsTimeStatement("testdoctype1.headerfield == now() - 3600"));
}
+// TODO -> stripe test
TEST_F(DistributorTest, update_bucket_database) {
enableDistributorClusterState("distributor:1 storage:3");
@@ -402,6 +411,8 @@ public:
}
+// TODO -> stripe test
+// TODO need to impl/test cross-stripe status requests
TEST_F(DistributorTest, tick_processes_status_requests) {
setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
@@ -430,6 +441,8 @@ TEST_F(DistributorTest, tick_processes_status_requests) {
EXPECT_THAT(thread.getResult(), HasSubstr("BucketId(0x4000000000000001)"));
}
+// TODO -> distributor test since it owns metric hook
+// TODO need to impl/test cross-stripe metrics aggregation
TEST_F(DistributorTest, metric_update_hook_updates_pending_maintenance_metrics) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
// To ensure we count all operations, not just those fitting within the
@@ -484,6 +497,7 @@ uint64_t db_sample_interval_sec(const Distributor& d) noexcept {
}
+// TODO -> stripe test
TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_time_intervals) {
getClock().setAbsoluteTimeInSeconds(1000);
@@ -521,6 +535,8 @@ TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_tim
EXPECT_GT(now_used, last_used);
}
+// TODO -> stripe test
+// TODO need to impl/test cross-stripe config propagation
TEST_F(DistributorTest, priority_config_is_propagated_to_distributor_configuration) {
using namespace vespa::config::content::core;
@@ -557,6 +573,7 @@ TEST_F(DistributorTest, priority_config_is_propagated_to_distributor_configurati
EXPECT_EQ(12, static_cast<int>(mp.mergeGlobalBuckets));
}
+// TODO -> stripe test
TEST_F(DistributorTest, no_db_resurrection_for_bucket_not_owned_in_pending_state) {
setupDistributor(Redundancy(1), NodeCount(10), "storage:2 distributor:2");
lib::ClusterState newState("storage:10 distributor:10");
@@ -578,6 +595,7 @@ TEST_F(DistributorTest, no_db_resurrection_for_bucket_not_owned_in_pending_state
EXPECT_EQ("NONEXISTING", dumpBucket(nonOwnedBucket));
}
+// TODO -> stripe test
TEST_F(DistributorTest, added_db_buckets_without_gc_timestamp_implicitly_get_current_time) {
setupDistributor(Redundancy(1), NodeCount(10), "storage:2 distributor:2");
getClock().setAbsoluteTimeInSeconds(101234);
@@ -591,6 +609,7 @@ TEST_F(DistributorTest, added_db_buckets_without_gc_timestamp_implicitly_get_cur
EXPECT_EQ(101234, e->getLastGarbageCollectionTime());
}
+// TODO -> stripe test
TEST_F(DistributorTest, merge_stats_are_accumulated_during_database_iteration) {
setupDistributor(Redundancy(2), NodeCount(3), "storage:3 distributor:1");
// Copies out of sync. Not possible for distributor to _reliably_ tell
@@ -662,6 +681,7 @@ DistributorTest::assertBucketSpaceStats(size_t expBucketPending, size_t expBucke
* their state checkers at all, we won't get any statistics from any other
* operations for the bucket.
*/
+// TODO -> stripe test
TEST_F(DistributorTest, stats_generated_for_preempted_operations) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
// For this test it suffices to have a single bucket with multiple aspects
@@ -686,6 +706,7 @@ TEST_F(DistributorTest, stats_generated_for_preempted_operations) {
}
}
+// TODO -> distributor test
TEST_F(DistributorTest, host_info_reporter_config_is_propagated_to_reporter) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
@@ -699,11 +720,13 @@ TEST_F(DistributorTest, host_info_reporter_config_is_propagated_to_reporter) {
EXPECT_FALSE(distributor_host_info_reporter().isReportingEnabled());
}
+// TODO -> stripe test (though config is a bit of a special case...)
TEST_F(DistributorTest, replica_counting_mode_is_configured_to_trusted_by_default) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
EXPECT_EQ(ConfigBuilder::MinimumReplicaCountingMode::TRUSTED, currentReplicaCountingMode());
}
+// TODO -> stripe test
TEST_F(DistributorTest, replica_counting_mode_config_is_propagated_to_metric_updater) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
ConfigBuilder builder;
@@ -712,6 +735,7 @@ TEST_F(DistributorTest, replica_counting_mode_config_is_propagated_to_metric_upd
EXPECT_EQ(ConfigBuilder::MinimumReplicaCountingMode::ANY, currentReplicaCountingMode());
}
+// TODO -> stripe test
TEST_F(DistributorTest, max_consecutively_inhibited_maintenance_ticks_config_is_propagated_to_internal_config) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
ConfigBuilder builder;
@@ -720,11 +744,13 @@ TEST_F(DistributorTest, max_consecutively_inhibited_maintenance_ticks_config_is_
EXPECT_EQ(getConfig().max_consecutively_inhibited_maintenance_ticks(), 123);
}
+// TODO -> stripe test
TEST_F(DistributorTest, bucket_activation_is_enabled_by_default) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
EXPECT_FALSE(getConfig().isBucketActivationDisabled());
}
+// TODO -> stripe test
TEST_F(DistributorTest, bucket_activation_config_is_propagated_to_distributor_configuration) {
using namespace vespa::config::content::core;
@@ -747,6 +773,7 @@ DistributorTest::configureMaxClusterClockSkew(int seconds) {
_distributor->enableNextConfig();
}
+// TODO -> stripe test
TEST_F(DistributorTest, max_clock_skew_config_is_propagated_to_distributor_config) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
@@ -817,6 +844,7 @@ void DistributorTest::assertNoMessageBounced() {
// TODO refactor this to set proper highest timestamp as part of bucket info
// reply once we have the "highest timestamp across all owned buckets" feature
// in place.
+// TODO where does this truly belong?
TEST_F(DistributorTest, configured_safe_time_point_rejection_works_end_to_end) {
setupDistributor(Redundancy(2), NodeCount(2),
"bits:1 storage:1 distributor:2");
@@ -846,6 +874,7 @@ void DistributorTest::configure_mutation_sequencing(bool enabled) {
_distributor->enableNextConfig();
}
+// TODO -> stripe test
TEST_F(DistributorTest, sequencing_config_is_propagated_to_distributor_config) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
@@ -871,6 +900,7 @@ DistributorTest::configure_merge_busy_inhibit_duration(int seconds) {
_distributor->enableNextConfig();
}
+// TODO -> stripe test
TEST_F(DistributorTest, merge_busy_inhibit_duration_config_is_propagated_to_distributor_config) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
@@ -878,6 +908,7 @@ TEST_F(DistributorTest, merge_busy_inhibit_duration_config_is_propagated_to_dist
EXPECT_EQ(getConfig().getInhibitMergesOnBusyNodeDuration(), std::chrono::seconds(7));
}
+// TODO -> stripe test
TEST_F(DistributorTest, merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:1 distributor:1");
addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t");
@@ -903,6 +934,7 @@ TEST_F(DistributorTest, merge_busy_inhibit_duration_is_propagated_to_pending_mes
EXPECT_FALSE(node_info.isBusy(0));
}
+// TODO -> stripe test
TEST_F(DistributorTest, external_client_requests_are_handled_individually_in_priority_order) {
setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t/a");
@@ -931,6 +963,7 @@ TEST_F(DistributorTest, external_client_requests_are_handled_individually_in_pri
EXPECT_THAT(actual, ContainerEq(expected));
}
+// TODO -> stripe test
TEST_F(DistributorTest, internal_messages_are_started_in_fifo_order_batch) {
// To test internal request ordering, we use NotifyBucketChangeCommand
// for the reason that it explicitly updates the bucket database for
@@ -959,6 +992,8 @@ TEST_F(DistributorTest, internal_messages_are_started_in_fifo_order_batch) {
EXPECT_EQ(api::BucketInfo(1, 1, 1), e.getBucketInfo().getNode(0)->getBucketInfo());
}
+// TODO -> stripe test
+// TODO also test that closing distributor closes stripes
TEST_F(DistributorTest, closing_aborts_priority_queued_client_requests) {
setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
document::BucketId bucket(16, 1);
@@ -997,6 +1032,9 @@ void assert_invalid_stats_for_all_spaces(
}
+// TODO -> stripe test
+// TODO must impl/test cross-stripe bucket space stats
+// TODO cross-stripe recovery mode handling how?
TEST_F(DistributorTest, entering_recovery_mode_resets_bucket_space_stats) {
// Set up a cluster state + DB contents which implies merge maintenance ops
setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2");
@@ -1018,6 +1056,7 @@ TEST_F(DistributorTest, entering_recovery_mode_resets_bucket_space_stats) {
assert_invalid_stats_for_all_spaces(stats, 2);
}
+// TODO figure out interaction between stripes and distributors on this one
TEST_F(DistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_replies) {
setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2");
// Should not send explicit replies during init stage
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index 7929cc1c906..8a0e18c86af 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -66,7 +66,7 @@ DistributorTestUtil::setup_distributor(int redundancy,
// This is for all intents and purposes a hack to avoid having the
// distributor treat setting the distribution explicitly as a signal that
// it should send RequestBucketInfo to all configured nodes.
- // If we called storageDistributionChanged followed by enableDistribution
+ // If we called storage_distribution_changed followed by enableDistribution
// explicitly (which is what happens in "real life"), that is what would
// take place.
// The inverse case of this can be explicitly accomplished by calling
@@ -338,7 +338,7 @@ DistributorTestUtil::disableBucketActivationInConfig(bool disable)
getConfig().configure(config);
}
-BucketDBUpdater&
+StripeBucketDBUpdater&
DistributorTestUtil::getBucketDBUpdater() {
return _distributor->bucket_db_updater();
}
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index d3c0445d5b5..ddf153a1406 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -17,7 +17,7 @@ namespace framework { struct TickingThreadPool; }
namespace distributor {
-class BucketDBUpdater;
+class StripeBucketDBUpdater;
class Distributor;
class DistributorBucketSpace;
class DistributorBucketSpaceRepo;
@@ -112,7 +112,7 @@ public:
int idx = -1,
api::ReturnCode::Result result = api::ReturnCode::OK);
- BucketDBUpdater& getBucketDBUpdater();
+ StripeBucketDBUpdater& getBucketDBUpdater();
IdealStateManager& getIdealStateManager();
ExternalOperationHandler& getExternalOperationHandler();
storage::distributor::DistributorStripeComponent& distributor_component();
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index ea963b227e2..53f2e7c8413 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -31,6 +31,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_maxPendingMaintenanceOps(1000),
_maxVisitorsPerNodePerClientVisitor(4),
_minBucketsPerVisitor(5),
+ _num_distributor_stripes(0),
_maxClusterClockSkew(0),
_inhibitMergeSendingOnBusyNodeDuration(60s),
_simulated_db_pruning_latency(0),
@@ -181,6 +182,8 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
}
_simulated_db_pruning_latency = std::chrono::milliseconds(std::max(0, config.simulatedDbPruningLatencyMsec));
_simulated_db_merging_latency = std::chrono::milliseconds(std::max(0, config.simulatedDbMergingLatencyMsec));
+
+ _num_distributor_stripes = std::max(0, config.numDistributorStripes); // TODO STRIPE test
LOG(debug,
"Distributor now using new configuration parameters. Split limits: %d docs/%d bytes. "
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index 9c1456fa9fd..479298ff082 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -265,6 +265,8 @@ public:
return _enable_revert;
}
+ uint32_t num_distributor_stripes() const noexcept { return _num_distributor_stripes; }
+
bool containsTimeStatement(const std::string& documentSelection) const;
private:
@@ -297,6 +299,8 @@ private:
uint32_t _maxVisitorsPerNodePerClientVisitor;
uint32_t _minBucketsPerVisitor;
+ uint32_t _num_distributor_stripes;
+
MaintenancePriorities _maintenancePriorities;
std::chrono::seconds _maxClusterClockSkew;
std::chrono::seconds _inhibitMergeSendingOnBusyNodeDuration;
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index d82d14831f4..57d6a23c79f 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -4,6 +4,7 @@ vespa_add_library(storage_distributor
activecopy.cpp
blockingoperationstarter.cpp
bucket_db_prune_elision.cpp
+ bucket_space_distribution_configs.cpp
bucket_space_distribution_context.cpp
bucketdbupdater.cpp
bucketgctimecalculator.cpp
@@ -23,6 +24,7 @@ vespa_add_library(storage_distributor
ideal_service_layer_nodes_bundle.cpp
idealstatemanager.cpp
idealstatemetricsset.cpp
+ legacy_single_stripe_accessor.cpp
messagetracker.cpp
nodeinfo.cpp
operation_routing_snapshot.cpp
@@ -40,6 +42,7 @@ vespa_add_library(storage_distributor
statechecker.cpp
statecheckers.cpp
statusreporterdelegate.cpp
+ stripe_bucket_db_updater.cpp
throttlingoperationstarter.cpp
update_metric_set.cpp
visitormetricsset.cpp
diff --git a/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp b/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp
new file mode 100644
index 00000000000..da5769411d4
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp
@@ -0,0 +1,17 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "bucket_space_distribution_configs.h"
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
+#include <vespa/vdslib/distribution/distribution.h>
+
+namespace storage::distributor {
+
+BucketSpaceDistributionConfigs
+BucketSpaceDistributionConfigs::from_default_distribution(std::shared_ptr<const lib::Distribution> distribution) {
+ BucketSpaceDistributionConfigs ret;
+ ret.space_configs.emplace(document::FixedBucketSpaces::global_space(), GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution));
+ ret.space_configs.emplace(document::FixedBucketSpaces::default_space(), std::move(distribution));
+ return ret;
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h b/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h
new file mode 100644
index 00000000000..9ebd8ef9732
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h
@@ -0,0 +1,27 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/document/bucket/bucketspace.h>
+#include <map>
+#include <memory>
+
+namespace storage::lib { class Distribution; }
+
+namespace storage::distributor {
+
+/**
+ * Represents a complete mapping of all known bucket spaces to their appropriate,
+ * (possibly derived) distribution config.
+ */
+struct BucketSpaceDistributionConfigs {
+ std::map<document::BucketSpace, std::shared_ptr<const lib::Distribution>> space_configs;
+
+ std::shared_ptr<const lib::Distribution> get_or_nullptr(document::BucketSpace space) const noexcept {
+ auto iter = space_configs.find(space);
+ return (iter != space_configs.end()) ? iter->second : std::shared_ptr<const lib::Distribution>();
+ }
+
+ static BucketSpaceDistributionConfigs from_default_distribution(std::shared_ptr<const lib::Distribution>);
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 6735ea1e533..6dcb5ba732e 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -2,12 +2,15 @@
#include "bucketdbupdater.h"
#include "bucket_db_prune_elision.h"
+#include "bucket_space_distribution_configs.h"
#include "bucket_space_distribution_context.h"
#include "distributor.h"
#include "distributor_bucket_space.h"
#include "distributormetricsset.h"
#include "simpleclusterinformation.h"
+#include "stripe_access_guard.h"
#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/vdslib/distribution/distribution.h>
@@ -24,74 +27,71 @@ using document::BucketSpace;
namespace storage::distributor {
-BucketDBUpdater::BucketDBUpdater(DistributorStripeInterface& owner,
- DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
+BucketDBUpdater::BucketDBUpdater(DistributorStripeInterface& owner, // FIXME STRIPE!
DistributorMessageSender& sender,
- DistributorComponentRegister& compReg)
- : framework::StatusReporter("bucketdb", "Bucket DB Updater"),
- _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"),
- _node_ctx(_distributorComponent),
- _op_ctx(_distributorComponent),
- _distributor_interface(_distributorComponent.getDistributor()),
- _delayedRequests(),
- _sentMessages(),
- _pendingClusterState(),
+ DistributorComponentRegister& comp_reg,
+ StripeAccessor& stripe_accessor)
+ : framework::StatusReporter("temp_bucketdb", "Bucket DB Updater"), // TODO STRIPE rename once duplication is removed
+ _stripe_accessor(stripe_accessor),
+ _active_state_bundle(lib::ClusterState()),
+ _dummy_mutable_bucket_space_repo(std::make_unique<DistributorBucketSpaceRepo>(owner.getDistributorIndex())),
+ _dummy_read_only_bucket_space_repo(std::make_unique<DistributorBucketSpaceRepo>(owner.getDistributorIndex())),
+ _distributor_component(owner, *_dummy_mutable_bucket_space_repo, *_dummy_read_only_bucket_space_repo, comp_reg, "Bucket DB Updater"),
+ _node_ctx(_distributor_component),
+ _op_ctx(_distributor_component),
+ _distributor_interface(_distributor_component.getDistributor()),
+ _delayed_requests(),
+ _sent_messages(),
+ _pending_cluster_state(),
_history(),
_sender(sender),
- _enqueuedRechecks(),
- _outdatedNodesMap(),
- _transitionTimer(_node_ctx.clock()),
- _stale_reads_enabled(false),
- _active_distribution_contexts(),
- _explicit_transition_read_guard(),
- _distribution_context_mutex()
+ _enqueued_rechecks(),
+ _outdated_nodes_map(),
+ _transition_timer(_node_ctx.clock()),
+ _stale_reads_enabled(false)
{
- for (auto& elem : _op_ctx.bucket_space_repo()) {
- _active_distribution_contexts.emplace(
- elem.first,
- BucketSpaceDistributionContext::make_not_yet_initialized(_node_ctx.node_index()));
- _explicit_transition_read_guard.emplace(elem.first, std::shared_ptr<BucketDatabase::ReadGuard>());
- }
+ // FIXME STRIPE top-level Distributor needs a proper way to track the current cluster state bundle!
+ propagate_active_state_bundle_internally();
+ bootstrap_distribution_config(_distributor_component.getDistribution());
}
BucketDBUpdater::~BucketDBUpdater() = default;
-OperationRoutingSnapshot BucketDBUpdater::read_snapshot_for_bucket(const document::Bucket& bucket) const {
- const auto bucket_space = bucket.getBucketSpace();
- std::lock_guard lock(_distribution_context_mutex);
- auto active_state_iter = _active_distribution_contexts.find(bucket_space);
- assert(active_state_iter != _active_distribution_contexts.cend());
- auto& state = *active_state_iter->second;
- if (!state.bucket_owned_in_active_state(bucket.getBucketId())) {
- return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second);
+void
+BucketDBUpdater::propagate_active_state_bundle_internally() {
+ for (auto* repo : {_dummy_mutable_bucket_space_repo.get(), _dummy_read_only_bucket_space_repo.get()}) {
+ for (auto& iter : *repo) {
+ iter.second->setClusterState(_active_state_bundle.getDerivedClusterState(iter.first));
+ }
}
- const bool bucket_present_in_mutable_db = state.bucket_owned_in_pending_state(bucket.getBucketId());
- if (!bucket_present_in_mutable_db && !stale_reads_enabled()) {
- return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second);
+}
+
+void
+BucketDBUpdater::bootstrap_distribution_config(std::shared_ptr<const lib::Distribution> distribution) {
+ auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution);
+ for (auto* repo : {_dummy_mutable_bucket_space_repo.get(), _dummy_read_only_bucket_space_repo.get()}) {
+ repo->get(document::FixedBucketSpaces::default_space()).setDistribution(distribution);
+ repo->get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr);
}
- const auto& space_repo = bucket_present_in_mutable_db
- ? _op_ctx.bucket_space_repo()
- : _op_ctx.read_only_bucket_space_repo();
- auto existing_guard_iter = _explicit_transition_read_guard.find(bucket_space);
- assert(existing_guard_iter != _explicit_transition_read_guard.cend());
- auto db_guard = existing_guard_iter->second
- ? existing_guard_iter-> second
- : space_repo.get(bucket_space).getBucketDatabase().acquire_read_guard();
- return OperationRoutingSnapshot::make_routable_with_guard(active_state_iter->second, std::move(db_guard), space_repo);
+ // TODO STRIPE do we need to bootstrap the stripes as well here? Or do they do this on their own volition?
+ // ... need to take a guard if so, so can probably not be done at ctor time..?
}
+// TODO STRIPE what to do with merge guards...
+// FIXME what about bucket DB replica update timestamp allocations?! Replace with u64 counter..?
+// Must at the very least ensure we use stripe-local TS generation for DB inserts...! i.e. no global TS
+// Or do we have to touch these at all here? Just defer all this via stripe interface?
void
BucketDBUpdater::flush()
{
- for (auto & entry : _sentMessages) {
+ for (auto & entry : _sent_messages) {
// Cannot sendDown MergeBucketReplies during flushing, since
// all lower links have been closed
if (entry.second._mergeReplyGuard) {
entry.second._mergeReplyGuard->resetReply();
}
}
- _sentMessages.clear();
+ _sent_messages.clear();
}
void
@@ -102,26 +102,19 @@ BucketDBUpdater::print(std::ostream& out, bool verbose, const std::string& inden
}
bool
-BucketDBUpdater::shouldDeferStateEnabling() const noexcept
+BucketDBUpdater::should_defer_state_enabling() const noexcept
{
return stale_reads_enabled();
}
bool
-BucketDBUpdater::hasPendingClusterState() const
+BucketDBUpdater::has_pending_cluster_state() const
{
- return static_cast<bool>(_pendingClusterState);
-}
-
-const lib::ClusterState*
-BucketDBUpdater::pendingClusterStateOrNull(const document::BucketSpace& space) const {
- return (hasPendingClusterState()
- ? _pendingClusterState->getNewClusterStateBundle().getDerivedClusterState(space).get()
- : nullptr);
+ return static_cast<bool>(_pending_cluster_state);
}
void
-BucketDBUpdater::sendRequestBucketInfo(
+BucketDBUpdater::send_request_bucket_info(
uint16_t node,
const document::Bucket& bucket,
const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard)
@@ -135,116 +128,50 @@ BucketDBUpdater::sendRequestBucketInfo(
auto msg = std::make_shared<api::RequestBucketInfoCommand>(bucket.getBucketSpace(), buckets);
- LOG(debug,
- "Sending request bucket info command %" PRIu64 " for "
- "bucket %s to node %u",
- msg->getMsgId(),
- bucket.toString().c_str(),
- node);
+ LOG(debug, "Sending request bucket info command %" PRIu64 " for bucket %s to node %u",
+ msg->getMsgId(), bucket.toString().c_str(), node);
msg->setPriority(50);
msg->setAddress(_node_ctx.node_address(node));
- _sentMessages[msg->getMsgId()] =
+ _sent_messages[msg->getMsgId()] =
BucketRequest(node, _op_ctx.generate_unique_timestamp(),
bucket, mergeReplyGuard);
_sender.sendCommand(msg);
}
void
-BucketDBUpdater::recheckBucketInfo(uint32_t nodeIdx,
- const document::Bucket& bucket)
-{
- sendRequestBucketInfo(nodeIdx, bucket, std::shared_ptr<MergeReplyGuard>());
-}
-
-namespace {
-
-class ReadOnlyDbMergingInserter : public BucketDatabase::MergingProcessor {
- using NewEntries = std::vector<BucketDatabase::Entry>;
- NewEntries::const_iterator _current;
- const NewEntries::const_iterator _last;
-public:
- explicit ReadOnlyDbMergingInserter(const NewEntries& new_entries)
- : _current(new_entries.cbegin()),
- _last(new_entries.cend())
- {}
-
- Result merge(BucketDatabase::Merger& m) override {
- const uint64_t key_to_insert = m.bucket_key();
- uint64_t key_at_cursor = 0;
- while (_current != _last) {
- key_at_cursor = _current->getBucketId().toKey();
- if (key_at_cursor >= key_to_insert) {
- break;
- }
- m.insert_before_current(_current->getBucketId(), *_current);
- ++_current;
- }
- if ((_current != _last) && (key_at_cursor == key_to_insert)) {
- // If we encounter a bucket that already exists, replace value wholesale.
- // Don't try to cleverly merge replicas, as the values we currently hold
- // in the read-only DB may be stale.
- // Note that this case shouldn't really happen, since we only add previously
- // owned buckets to the read-only DB, and subsequent adds to a non-empty DB
- // can only happen for state preemptions. Since ownership is not regained
- // before a state is stable, a bucket is only added once. But we handle it
- // anyway in case this changes at some point in the future.
- m.current_entry() = *_current;
- return Result::Update;
- }
- return Result::KeepUnchanged;
- }
-
- void insert_remaining_at_end(BucketDatabase::TrailingInserter& inserter) override {
- for (; _current != _last; ++_current) {
- inserter.insert_at_end(_current->getBucketId(), *_current);
- }
- }
-};
-
-}
-
-void
-BucketDBUpdater::removeSuperfluousBuckets(
- const lib::ClusterStateBundle& newState,
+BucketDBUpdater::remove_superfluous_buckets(
+ StripeAccessGuard& guard,
+ const lib::ClusterStateBundle& new_state,
bool is_distribution_config_change)
{
- const bool move_to_read_only_db = shouldDeferStateEnabling();
const char* up_states = _op_ctx.storage_node_up_states();
+ // TODO STRIPE explicit space -> config mapping, don't get via repo
+ // ... but we need to get the current cluster state per space..!
for (auto& elem : _op_ctx.bucket_space_repo()) {
- const auto& newDistribution(elem.second->getDistribution());
- const auto& oldClusterState(elem.second->getClusterState());
- const auto& new_cluster_state = newState.getDerivedClusterState(elem.first);
+ const auto& old_cluster_state(elem.second->getClusterState());
+ const auto& new_cluster_state = new_state.getDerivedClusterState(elem.first);
// Running a full DB sweep is expensive, so if the cluster state transition does
// not actually indicate that buckets should possibly be removed, we elide it entirely.
if (!is_distribution_config_change
- && db_pruning_may_be_elided(oldClusterState, *new_cluster_state, up_states))
+ && db_pruning_may_be_elided(old_cluster_state, *new_cluster_state, up_states))
{
LOG(debug, "[bucket space '%s']: eliding DB pruning for state transition '%s' -> '%s'",
document::FixedBucketSpaces::to_string(elem.first).data(),
- oldClusterState.toString().c_str(), new_cluster_state->toString().c_str());
+ old_cluster_state.toString().c_str(), new_cluster_state->toString().c_str());
continue;
}
-
- auto& bucketDb(elem.second->getBucketDatabase());
- auto& readOnlyDb(_op_ctx.read_only_bucket_space_repo().get(elem.first).getBucketDatabase());
-
- // Remove all buckets not belonging to this distributor, or
- // being on storage nodes that are no longer up.
- MergingNodeRemover proc(
- oldClusterState,
- *new_cluster_state,
- _node_ctx.node_index(),
- newDistribution,
- up_states,
- move_to_read_only_db);
-
- bucketDb.merge(proc);
- if (move_to_read_only_db) {
- ReadOnlyDbMergingInserter read_only_merger(proc.getNonOwnedEntries());
- readOnlyDb.merge(read_only_merger);
+ // TODO STRIPE should we also pass old state and distr config? Must ensure we're in sync with stripe...
+ // .. but config is set synchronously via the guard upon pending state creation edge
+ auto maybe_lost = guard.remove_superfluous_buckets(elem.first, *new_cluster_state, is_distribution_config_change);
+ if (maybe_lost.buckets != 0) {
+ LOGBM(info, "After cluster state change %s, %zu buckets no longer "
+ "have available replicas. %zu documents in these buckets will "
+ "be unavailable until nodes come back up",
+ old_cluster_state.getTextualDifference(*new_cluster_state).c_str(),
+ maybe_lost.buckets, maybe_lost.documents);
}
maybe_inject_simulated_db_pruning_delay();
}
@@ -271,63 +198,58 @@ BucketDBUpdater::maybe_inject_simulated_db_merging_delay() {
}
void
-BucketDBUpdater::ensureTransitionTimerStarted()
+BucketDBUpdater::ensure_transition_timer_started()
{
// Don't overwrite start time if we're already processing a state, as
// that will make transition times appear artificially low.
- if (!hasPendingClusterState()) {
- _transitionTimer = framework::MilliSecTimer(
- _node_ctx.clock());
+ if (!has_pending_cluster_state()) {
+ _transition_timer = framework::MilliSecTimer(_node_ctx.clock());
}
}
void
-BucketDBUpdater::completeTransitionTimer()
+BucketDBUpdater::complete_transition_timer()
{
_distributor_interface.getMetrics()
- .stateTransitionTime.addValue(_transitionTimer.getElapsedTimeAsDouble());
+ .stateTransitionTime.addValue(_transition_timer.getElapsedTimeAsDouble());
}
void
-BucketDBUpdater::clearReadOnlyBucketRepoDatabases()
+BucketDBUpdater::storage_distribution_changed(const BucketSpaceDistributionConfigs& configs)
{
- for (auto& space : _op_ctx.read_only_bucket_space_repo()) {
- space.second->getBucketDatabase().clear();
- }
-}
+ ensure_transition_timer_started();
-void
-BucketDBUpdater::storageDistributionChanged()
-{
- ensureTransitionTimerStarted();
-
- removeSuperfluousBuckets(_op_ctx.cluster_state_bundle(), true);
+ auto guard = _stripe_accessor.rendezvous_and_hold_all();
+ // FIXME STRIPE might this cause a mismatch with the component stuff's own distribution config..?!
+ guard->update_distribution_config(configs);
+ remove_superfluous_buckets(*guard, _op_ctx.cluster_state_bundle(), true);
auto clusterInfo = std::make_shared<const SimpleClusterInformation>(
_node_ctx.node_index(),
_op_ctx.cluster_state_bundle(),
_op_ctx.storage_node_up_states());
- _pendingClusterState = PendingClusterState::createForDistributionChange(
+ _pending_cluster_state = PendingClusterState::createForDistributionChange(
_node_ctx.clock(),
std::move(clusterInfo),
_sender,
- _op_ctx.bucket_space_repo(),
- _op_ctx.generate_unique_timestamp());
- _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
- _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
+ _op_ctx.bucket_space_repo(), // TODO STRIPE cannot use!
+ _op_ctx.generate_unique_timestamp()); // TODO STRIPE must ensure no stripes can generate < this
+ _outdated_nodes_map = _pending_cluster_state->getOutdatedNodesMap();
+
+ guard->set_pending_cluster_state_bundle(_pending_cluster_state->getNewClusterStateBundle());
}
void
-BucketDBUpdater::replyToPreviousPendingClusterStateIfAny()
+BucketDBUpdater::reply_to_previous_pending_cluster_state_if_any()
{
- if (_pendingClusterState.get() && _pendingClusterState->hasCommand()) {
+ if (_pending_cluster_state.get() && _pending_cluster_state->hasCommand()) {
_distributor_interface.getMessageSender().sendUp(
- std::make_shared<api::SetSystemStateReply>(*_pendingClusterState->getCommand()));
+ std::make_shared<api::SetSystemStateReply>(*_pending_cluster_state->getCommand()));
}
}
void
-BucketDBUpdater::replyToActivationWithActualVersion(
+BucketDBUpdater::reply_to_activation_with_actual_version(
const api::ActivateClusterStateVersionCommand& cmd,
uint32_t actualVersion)
{
@@ -336,104 +258,49 @@ BucketDBUpdater::replyToActivationWithActualVersion(
_distributor_interface.getMessageSender().sendUp(reply); // TODO let API accept rvalues
}
-void BucketDBUpdater::update_read_snapshot_before_db_pruning() {
- std::lock_guard lock(_distribution_context_mutex);
- for (auto& elem : _op_ctx.bucket_space_repo()) {
- // At this point, we're still operating with a distribution context _without_ a
- // pending state, i.e. anyone using the context will expect to find buckets
- // in the DB that correspond to how the database looked like prior to pruning
- // buckets from the DB. To ensure this is not violated, take a snapshot of the
- // _mutable_ DB and expose this. This snapshot only lives until we atomically
- // flip to expose a distribution context that includes the new, pending state.
- // At that point, the read-only DB is known to contain the buckets that have
- // been pruned away, so we can release the mutable DB snapshot safely.
- // TODO test for, and handle, state preemption case!
- _explicit_transition_read_guard[elem.first] = elem.second->getBucketDatabase().acquire_read_guard();
- }
-}
-
-
-void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) {
- std::lock_guard lock(_distribution_context_mutex);
- const auto old_default_state = _op_ctx.bucket_space_repo().get(
- document::FixedBucketSpaces::default_space()).cluster_state_sp();
- for (auto& elem : _op_ctx.bucket_space_repo()) {
- auto new_distribution = elem.second->distribution_sp();
- auto old_cluster_state = elem.second->cluster_state_sp();
- auto new_cluster_state = new_state.getDerivedClusterState(elem.first);
- _active_distribution_contexts.insert_or_assign(
- elem.first,
- BucketSpaceDistributionContext::make_state_transition(
- std::move(old_cluster_state),
- old_default_state,
- std::move(new_cluster_state),
- std::move(new_distribution),
- _node_ctx.node_index()));
- // We can now remove the explicit mutable DB snapshot, as the buckets that have been
- // pruned away are visible in the read-only DB.
- _explicit_transition_read_guard[elem.first] = std::shared_ptr<BucketDatabase::ReadGuard>();
- }
-}
-
-void BucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) {
- std::lock_guard lock(_distribution_context_mutex);
- const auto& default_cluster_state = activated_state.getDerivedClusterState(document::FixedBucketSpaces::default_space());
- for (auto& elem : _op_ctx.bucket_space_repo()) {
- auto new_distribution = elem.second->distribution_sp();
- auto new_cluster_state = activated_state.getDerivedClusterState(elem.first);
- _active_distribution_contexts.insert_or_assign(
- elem.first,
- BucketSpaceDistributionContext::make_stable_state(
- std::move(new_cluster_state),
- default_cluster_state,
- std::move(new_distribution),
- _node_ctx.node_index()));
- }
-}
-
bool
BucketDBUpdater::onSetSystemState(
const std::shared_ptr<api::SetSystemStateCommand>& cmd)
{
- LOG(debug,
- "Received new cluster state %s",
+ LOG(debug, "Received new cluster state %s",
cmd->getSystemState().toString().c_str());
- const lib::ClusterStateBundle oldState = _op_ctx.cluster_state_bundle();
const lib::ClusterStateBundle& state = cmd->getClusterStateBundle();
- if (state == oldState) {
+ if (state == _active_state_bundle) {
return false;
}
- ensureTransitionTimerStarted();
- // Separate timer since _transitionTimer might span multiple pending states.
+ ensure_transition_timer_started();
+ // Separate timer since _transition_timer might span multiple pending states.
framework::MilliSecTimer process_timer(_node_ctx.clock());
- update_read_snapshot_before_db_pruning();
+
+ auto guard = _stripe_accessor.rendezvous_and_hold_all();
+ guard->update_read_snapshot_before_db_pruning();
const auto& bundle = cmd->getClusterStateBundle();
- removeSuperfluousBuckets(bundle, false);
- update_read_snapshot_after_db_pruning(bundle);
- replyToPreviousPendingClusterStateIfAny();
+ remove_superfluous_buckets(*guard, bundle, false);
+ guard->update_read_snapshot_after_db_pruning(bundle);
+ reply_to_previous_pending_cluster_state_if_any();
auto clusterInfo = std::make_shared<const SimpleClusterInformation>(
_node_ctx.node_index(),
_op_ctx.cluster_state_bundle(),
_op_ctx.storage_node_up_states());
- _pendingClusterState = PendingClusterState::createForClusterStateChange(
+ _pending_cluster_state = PendingClusterState::createForClusterStateChange(
_node_ctx.clock(),
std::move(clusterInfo),
_sender,
- _op_ctx.bucket_space_repo(),
+ _op_ctx.bucket_space_repo(), // TODO STRIPE remove
cmd,
- _outdatedNodesMap,
- _op_ctx.generate_unique_timestamp());
- _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
+ _outdated_nodes_map,
+ _op_ctx.generate_unique_timestamp()); // FIXME STRIPE must be atomic across all threads
+ _outdated_nodes_map = _pending_cluster_state->getOutdatedNodesMap();
_distributor_interface.getMetrics().set_cluster_state_processing_time.addValue(
process_timer.getElapsedTimeAsDouble());
- _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
- if (isPendingClusterStateCompleted()) {
- processCompletedPendingClusterState();
+ guard->set_pending_cluster_state_bundle(_pending_cluster_state->getNewClusterStateBundle());
+ if (is_pending_cluster_state_completed()) {
+ process_completed_pending_cluster_state(*guard);
}
return true;
}
@@ -441,25 +308,26 @@ BucketDBUpdater::onSetSystemState(
bool
BucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd)
{
- if (hasPendingClusterState() && _pendingClusterState->isVersionedTransition()) {
- const auto pending_version = _pendingClusterState->clusterStateVersion();
+ if (has_pending_cluster_state() && _pending_cluster_state->isVersionedTransition()) {
+ const auto pending_version = _pending_cluster_state->clusterStateVersion();
if (pending_version == cmd->version()) {
- if (isPendingClusterStateCompleted()) {
- assert(_pendingClusterState->isDeferred());
- activatePendingClusterState();
+ if (is_pending_cluster_state_completed()) {
+ assert(_pending_cluster_state->isDeferred());
+ auto guard = _stripe_accessor.rendezvous_and_hold_all();
+ activate_pending_cluster_state(*guard);
} else {
LOG(error, "Received cluster state activation for pending version %u "
"without pending state being complete yet. This is not expected, "
"as no activation should be sent before all distributors have "
"reported that state processing is complete.", pending_version);
- replyToActivationWithActualVersion(*cmd, 0); // Invalid version, will cause re-send (hopefully when completed).
+ reply_to_activation_with_actual_version(*cmd, 0); // Invalid version, will cause re-send (hopefully when completed).
return true;
}
} else {
- replyToActivationWithActualVersion(*cmd, pending_version);
+ reply_to_activation_with_actual_version(*cmd, pending_version);
return true;
}
- } else if (shouldDeferStateEnabling()) {
+ } else if (should_defer_state_enabling()) {
// Likely just a resend, but log warn for now to get a feel of how common it is.
LOG(warning, "Received cluster state activation command for version %u, which "
"has no corresponding pending state. Likely resent operation.", cmd->version());
@@ -471,6 +339,7 @@ BucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::Activa
return false;
}
+// TODO remove entirely from this abstraction level?
BucketDBUpdater::MergeReplyGuard::~MergeReplyGuard()
{
if (_reply) {
@@ -488,71 +357,36 @@ BucketDBUpdater::onMergeBucketReply(
// actually merged (source-only nodes?) we request the bucket info of the
// bucket again to make sure it's ok.
for (uint32_t i = 0; i < reply->getNodes().size(); i++) {
- sendRequestBucketInfo(reply->getNodes()[i].index,
- reply->getBucket(),
- replyGuard);
+ send_request_bucket_info(reply->getNodes()[i].index,
+ reply->getBucket(),
+ replyGuard);
}
return true;
}
void
-BucketDBUpdater::enqueueRecheckUntilPendingStateEnabled(
+BucketDBUpdater::enqueue_recheck_until_pending_state_enabled(
uint16_t node,
const document::Bucket& bucket)
{
- LOG(spam,
- "DB updater has a pending cluster state, enqueuing recheck "
- "of bucket %s on node %u until state is done processing",
- bucket.toString().c_str(),
- node);
- _enqueuedRechecks.insert(EnqueuedBucketRecheck(node, bucket));
+ LOG(spam, "DB updater has a pending cluster state, enqueuing recheck "
+ "of bucket %s on node %u until state is done processing",
+ bucket.toString().c_str(), node);
+ _enqueued_rechecks.insert(EnqueuedBucketRecheck(node, bucket));
}
void
-BucketDBUpdater::sendAllQueuedBucketRechecks()
+BucketDBUpdater::send_all_queued_bucket_rechecks()
{
- LOG(spam,
- "Sending %zu queued bucket rechecks previously received "
- "via NotifyBucketChange commands",
- _enqueuedRechecks.size());
+ LOG(spam, "Sending %zu queued bucket rechecks previously received "
+ "via NotifyBucketChange commands",
+ _enqueued_rechecks.size());
- for (const auto & entry :_enqueuedRechecks) {
- sendRequestBucketInfo(entry.node, entry.bucket, std::shared_ptr<MergeReplyGuard>());
+ for (const auto & entry :_enqueued_rechecks) {
+ send_request_bucket_info(entry.node, entry.bucket, std::shared_ptr<MergeReplyGuard>());
}
- _enqueuedRechecks.clear();
-}
-
-bool
-BucketDBUpdater::onNotifyBucketChange(
- const std::shared_ptr<api::NotifyBucketChangeCommand>& cmd)
-{
- // Immediately schedule reply to ensure it is sent.
- _sender.sendReply(std::make_shared<api::NotifyBucketChangeReply>(*cmd));
-
- if (!cmd->getBucketInfo().valid()) {
- LOG(error,
- "Received invalid bucket info for bucket %s from notify bucket "
- "change! Not updating bucket.",
- cmd->getBucketId().toString().c_str());
- return true;
- }
- LOG(debug,
- "Received notify bucket change from node %u for bucket %s with %s.",
- cmd->getSourceIndex(),
- cmd->getBucketId().toString().c_str(),
- cmd->getBucketInfo().toString().c_str());
-
- if (hasPendingClusterState()) {
- enqueueRecheckUntilPendingStateEnabled(cmd->getSourceIndex(),
- cmd->getBucket());
- } else {
- sendRequestBucketInfo(cmd->getSourceIndex(),
- cmd->getBucket(),
- std::shared_ptr<MergeReplyGuard>());
- }
-
- return true;
+ _enqueued_rechecks.clear();
}
bool sort_pred(const BucketListMerger::BucketEntry& left,
@@ -563,34 +397,34 @@ bool sort_pred(const BucketListMerger::BucketEntry& left,
bool
BucketDBUpdater::onRequestBucketInfoReply(
- const std::shared_ptr<api::RequestBucketInfoReply> & repl)
+ const std::shared_ptr<api::RequestBucketInfoReply>& repl)
{
- if (pendingClusterStateAccepted(repl)) {
+ if (pending_cluster_state_accepted(repl)) {
return true;
}
- return processSingleBucketInfoReply(repl);
+ return process_single_bucket_info_reply(repl);
}
bool
-BucketDBUpdater::pendingClusterStateAccepted(
- const std::shared_ptr<api::RequestBucketInfoReply> & repl)
+BucketDBUpdater::pending_cluster_state_accepted(
+ const std::shared_ptr<api::RequestBucketInfoReply>& repl)
{
- if (_pendingClusterState.get()
- && _pendingClusterState->onRequestBucketInfoReply(repl))
+ if (_pending_cluster_state.get()
+ && _pending_cluster_state->onRequestBucketInfoReply(repl))
{
- if (isPendingClusterStateCompleted()) {
- processCompletedPendingClusterState();
+ if (is_pending_cluster_state_completed()) {
+ auto guard = _stripe_accessor.rendezvous_and_hold_all();
+ process_completed_pending_cluster_state(*guard);
}
return true;
}
- LOG(spam,
- "Reply %s was not accepted by pending cluster state",
+ LOG(spam, "Reply %s was not accepted by pending cluster state",
repl->toString().c_str());
return false;
}
void
-BucketDBUpdater::handleSingleBucketInfoFailure(
+BucketDBUpdater::handle_single_bucket_info_failure(
const std::shared_ptr<api::RequestBucketInfoReply>& repl,
const BucketRequest& req)
{
@@ -600,31 +434,31 @@ BucketDBUpdater::handleSingleBucketInfoFailure(
if (req.bucket.getBucketId() != document::BucketId(0)) {
framework::MilliSecTime sendTime(_node_ctx.clock());
sendTime += framework::MilliSecTime(100);
- _delayedRequests.emplace_back(sendTime, req);
+ _delayed_requests.emplace_back(sendTime, req);
}
}
void
-BucketDBUpdater::resendDelayedMessages()
+BucketDBUpdater::resend_delayed_messages()
{
- if (_pendingClusterState) {
- _pendingClusterState->resendDelayedMessages();
+ if (_pending_cluster_state) {
+ _pending_cluster_state->resendDelayedMessages();
}
- if (_delayedRequests.empty()) {
+ if (_delayed_requests.empty()) {
return; // Don't fetch time if not needed
}
framework::MilliSecTime currentTime(_node_ctx.clock());
- while (!_delayedRequests.empty()
- && currentTime >= _delayedRequests.front().first)
+ while (!_delayed_requests.empty()
+ && currentTime >= _delayed_requests.front().first)
{
- BucketRequest& req(_delayedRequests.front().second);
- sendRequestBucketInfo(req.targetNode, req.bucket, std::shared_ptr<MergeReplyGuard>());
- _delayedRequests.pop_front();
+ BucketRequest& req(_delayed_requests.front().second);
+ send_request_bucket_info(req.targetNode, req.bucket, std::shared_ptr<MergeReplyGuard>());
+ _delayed_requests.pop_front();
}
}
void
-BucketDBUpdater::convertBucketInfoToBucketList(
+BucketDBUpdater::convert_bucket_info_to_bucket_list(
const std::shared_ptr<api::RequestBucketInfoReply>& repl,
uint16_t targetNode, BucketListMerger::BucketList& newList)
{
@@ -637,51 +471,51 @@ BucketDBUpdater::convertBucketInfoToBucketList(
}
void
-BucketDBUpdater::mergeBucketInfoWithDatabase(
+BucketDBUpdater::merge_bucket_info_with_database(
const std::shared_ptr<api::RequestBucketInfoReply>& repl,
const BucketRequest& req)
{
BucketListMerger::BucketList existing;
BucketListMerger::BucketList newList;
- findRelatedBucketsInDatabase(req.targetNode, req.bucket, existing);
- convertBucketInfoToBucketList(repl, req.targetNode, newList);
+ find_related_buckets_in_database(req.targetNode, req.bucket, existing);
+ convert_bucket_info_to_bucket_list(repl, req.targetNode, newList);
std::sort(existing.begin(), existing.end(), sort_pred);
std::sort(newList.begin(), newList.end(), sort_pred);
BucketListMerger merger(newList, existing, req.timestamp);
- updateDatabase(req.bucket.getBucketSpace(), req.targetNode, merger);
+ update_database(req.bucket.getBucketSpace(), req.targetNode, merger);
}
bool
-BucketDBUpdater::processSingleBucketInfoReply(
+BucketDBUpdater::process_single_bucket_info_reply(
const std::shared_ptr<api::RequestBucketInfoReply> & repl)
{
- auto iter = _sentMessages.find(repl->getMsgId());
+ auto iter = _sent_messages.find(repl->getMsgId());
// Has probably been deleted for some reason earlier.
- if (iter == _sentMessages.end()) {
+ if (iter == _sent_messages.end()) {
return true;
}
BucketRequest req = iter->second;
- _sentMessages.erase(iter);
+ _sent_messages.erase(iter);
if (!_op_ctx.storage_node_is_up(req.bucket.getBucketSpace(), req.targetNode)) {
// Ignore replies from nodes that are down.
return true;
}
if (repl->getResult().getResult() != api::ReturnCode::OK) {
- handleSingleBucketInfoFailure(repl, req);
+ handle_single_bucket_info_failure(repl, req);
return true;
}
- mergeBucketInfoWithDatabase(repl, req);
+ merge_bucket_info_with_database(repl, req);
return true;
}
void
-BucketDBUpdater::addBucketInfoForNode(
+BucketDBUpdater::add_bucket_info_for_node(
const BucketDatabase::Entry& e,
uint16_t node,
BucketListMerger::BucketList& existing) const
@@ -693,20 +527,20 @@ BucketDBUpdater::addBucketInfoForNode(
}
void
-BucketDBUpdater::findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket,
- BucketListMerger::BucketList& existing)
+BucketDBUpdater::find_related_buckets_in_database(uint16_t node, const document::Bucket& bucket,
+ BucketListMerger::BucketList& existing)
{
auto &distributorBucketSpace(_op_ctx.bucket_space_repo().get(bucket.getBucketSpace()));
std::vector<BucketDatabase::Entry> entries;
distributorBucketSpace.getBucketDatabase().getAll(bucket.getBucketId(), entries);
for (const BucketDatabase::Entry & entry : entries) {
- addBucketInfoForNode(entry, node, existing);
+ add_bucket_info_for_node(entry, node, existing);
}
}
void
-BucketDBUpdater::updateDatabase(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger)
+BucketDBUpdater::update_database(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger)
{
for (const document::BucketId & bucketId : merger.getRemovedEntries()) {
document::Bucket bucket(bucketSpace, bucketId);
@@ -723,18 +557,18 @@ BucketDBUpdater::updateDatabase(document::BucketSpace bucketSpace, uint16_t node
}
bool
-BucketDBUpdater::isPendingClusterStateCompleted() const
+BucketDBUpdater::is_pending_cluster_state_completed() const
{
- return _pendingClusterState.get() && _pendingClusterState->done();
+ return _pending_cluster_state.get() && _pending_cluster_state->done();
}
void
-BucketDBUpdater::processCompletedPendingClusterState()
+BucketDBUpdater::process_completed_pending_cluster_state(StripeAccessGuard& guard)
{
- if (_pendingClusterState->isDeferred()) {
+ if (_pending_cluster_state->isDeferred()) {
LOG(debug, "Deferring completion of pending cluster state version %u until explicitly activated",
- _pendingClusterState->clusterStateVersion());
- assert(_pendingClusterState->hasCommand()); // Deferred transitions should only ever be created by state commands.
+ _pending_cluster_state->clusterStateVersion());
+ assert(_pending_cluster_state->hasCommand()); // Deferred transitions should only ever be created by state commands.
// Sending down SetSystemState command will reach the state manager and a reply
// will be auto-sent back to the cluster controller in charge. Once this happens,
// it will send an explicit activation command once all distributors have reported
@@ -743,73 +577,81 @@ BucketDBUpdater::processCompletedPendingClusterState()
// taken effect via activation. External operation handler will keep operations from
// actually being scheduled until state has been activated. The external operation handler
// needs to be explicitly aware of the case where no state has yet to be activated.
- _distributor_interface.getMessageSender().sendDown(
- _pendingClusterState->getCommand());
- _pendingClusterState->clearCommand();
+ _distributor_interface.getMessageSender().sendDown(_pending_cluster_state->getCommand());
+ _pending_cluster_state->clearCommand();
return;
}
// Distribution config change or non-deferred cluster state. Immediately activate
// the pending state without being told to do so explicitly.
- activatePendingClusterState();
+ activate_pending_cluster_state(guard);
}
void
-BucketDBUpdater::activatePendingClusterState()
+BucketDBUpdater::activate_pending_cluster_state(StripeAccessGuard& guard)
{
framework::MilliSecTimer process_timer(_node_ctx.clock());
- _pendingClusterState->mergeIntoBucketDatabases();
+ _pending_cluster_state->merge_into_bucket_databases(guard);
maybe_inject_simulated_db_merging_delay();
- if (_pendingClusterState->isVersionedTransition()) {
- LOG(debug, "Activating pending cluster state version %u", _pendingClusterState->clusterStateVersion());
- enableCurrentClusterStateBundleInDistributor();
- if (_pendingClusterState->hasCommand()) {
- _distributor_interface.getMessageSender().sendDown(
- _pendingClusterState->getCommand());
+ if (_pending_cluster_state->isVersionedTransition()) {
+ LOG(debug, "Activating pending cluster state version %u", _pending_cluster_state->clusterStateVersion());
+ enable_current_cluster_state_bundle_in_distributor_and_stripes(guard);
+ if (_pending_cluster_state->hasCommand()) {
+ _distributor_interface.getMessageSender().sendDown(_pending_cluster_state->getCommand());
}
- addCurrentStateToClusterStateHistory();
+ add_current_state_to_cluster_state_history();
} else {
LOG(debug, "Activating pending distribution config");
// TODO distribution changes cannot currently be deferred as they are not
// initiated by the cluster controller!
- _distributor_interface.notifyDistributionChangeEnabled();
+ _distributor_interface.notifyDistributionChangeEnabled(); // TODO factor these two out into one func?
+ guard.notify_distribution_change_enabled();
}
- update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle());
- _pendingClusterState.reset();
- _outdatedNodesMap.clear();
- _op_ctx.bucket_space_repo().clear_pending_cluster_state_bundle();
- sendAllQueuedBucketRechecks();
- completeTransitionTimer();
- clearReadOnlyBucketRepoDatabases();
+ guard.update_read_snapshot_after_activation(_pending_cluster_state->getNewClusterStateBundle());
+ _pending_cluster_state.reset();
+ _outdated_nodes_map.clear();
+ guard.clear_pending_cluster_state_bundle();
+ send_all_queued_bucket_rechecks();
+ complete_transition_timer();
+ guard.clear_read_only_bucket_repo_databases();
_distributor_interface.getMetrics().activate_cluster_state_processing_time.addValue(
process_timer.getElapsedTimeAsDouble());
}
void
-BucketDBUpdater::enableCurrentClusterStateBundleInDistributor()
+BucketDBUpdater::enable_current_cluster_state_bundle_in_distributor_and_stripes(StripeAccessGuard& guard)
{
- const lib::ClusterStateBundle& state(
- _pendingClusterState->getNewClusterStateBundle());
+ const lib::ClusterStateBundle& state = _pending_cluster_state->getNewClusterStateBundle();
- LOG(debug,
- "BucketDBUpdater finished processing state %s",
+ _active_state_bundle = _pending_cluster_state->getNewClusterStateBundle();
+ propagate_active_state_bundle_internally();
+
+ LOG(debug, "BucketDBUpdater finished processing state %s",
state.getBaselineClusterState()->toString().c_str());
+ // First enable the cluster state for the _top-level_ distributor component.
_distributor_interface.enableClusterStateBundle(state);
+ // And then subsequently for all underlying stripes. Technically the order doesn't matter
+ // since all threads are blocked at this point.
+ guard.enable_cluster_state_bundle(state);
}
void BucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state) {
- update_read_snapshot_after_activation(activated_state);
+ auto guard = _stripe_accessor.rendezvous_and_hold_all();
_distributor_interface.enableClusterStateBundle(activated_state);
+ guard->enable_cluster_state_bundle(activated_state);
+
+ _active_state_bundle = activated_state;
+ propagate_active_state_bundle_internally();
}
void
-BucketDBUpdater::addCurrentStateToClusterStateHistory()
+BucketDBUpdater::add_current_state_to_cluster_state_history()
{
- _history.push_back(_pendingClusterState->getSummary());
+ _history.push_back(_pending_cluster_state->getSummary());
if (_history.size() > 50) {
_history.pop_front();
@@ -857,22 +699,22 @@ BucketDBUpdater::reportStatus(std::ostream& out,
xos << XmlTag("status")
<< XmlAttribute("id", BUCKETDB)
<< XmlAttribute("name", BUCKETDB_UPDATER);
- reportXmlStatus(xos, path);
+ report_xml_status(xos, path);
xos << XmlEndTag();
return true;
}
vespalib::string
-BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos,
- const framework::HttpUrlPath&) const
+BucketDBUpdater::report_xml_status(vespalib::xml::XmlOutputStream& xos,
+ const framework::HttpUrlPath&) const
{
using namespace vespalib::xml;
xos << XmlTag("bucketdb")
<< XmlTag("systemstate_active")
<< XmlContent(_op_ctx.cluster_state_bundle().getBaselineClusterState()->toString())
<< XmlEndTag();
- if (_pendingClusterState) {
- xos << *_pendingClusterState;
+ if (_pending_cluster_state) {
+ xos << *_pending_cluster_state;
}
xos << XmlTag("systemstate_history");
for (auto i(_history.rbegin()), e(_history.rend()); i != e; ++i) {
@@ -884,13 +726,13 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos,
}
xos << XmlEndTag()
<< XmlTag("single_bucket_requests");
- for (const auto & entry : _sentMessages)
+ for (const auto & entry : _sent_messages)
{
entry.second.print_xml_tag(xos, XmlAttribute("sendtimestamp", entry.second.timestamp));
}
xos << XmlEndTag()
<< XmlTag("delayed_single_bucket_requests");
- for (const auto & entry : _delayedRequests)
+ for (const auto & entry : _delayed_requests)
{
entry.second.print_xml_tag(xos, XmlAttribute("resendtimestamp", entry.first.getTime()));
}
@@ -898,166 +740,4 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos,
return "";
}
-BucketDBUpdater::MergingNodeRemover::MergingNodeRemover(
- const lib::ClusterState& oldState,
- const lib::ClusterState& s,
- uint16_t localIndex,
- const lib::Distribution& distribution,
- const char* upStates,
- bool track_non_owned_entries)
- : _oldState(oldState),
- _state(s),
- _available_nodes(),
- _nonOwnedBuckets(),
- _removed_buckets(0),
- _removed_documents(0),
- _localIndex(localIndex),
- _distribution(distribution),
- _upStates(upStates),
- _track_non_owned_entries(track_non_owned_entries),
- _cachedDecisionSuperbucket(UINT64_MAX),
- _cachedOwned(false)
-{
- // TODO intersection of cluster state and distribution config
- const uint16_t storage_count = s.getNodeCount(lib::NodeType::STORAGE);
- _available_nodes.resize(storage_count);
- for (uint16_t i = 0; i < storage_count; ++i) {
- if (s.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState().oneOf(_upStates)) {
- _available_nodes[i] = true;
- }
- }
-}
-
-void
-BucketDBUpdater::MergingNodeRemover::logRemove(const document::BucketId& bucketId, const char* msg) const
-{
- LOG(spam, "Removing bucket %s: %s", bucketId.toString().c_str(), msg);
-}
-
-namespace {
-
-uint64_t superbucket_from_id(const document::BucketId& id, uint16_t distribution_bits) noexcept {
- // The n LSBs of the bucket ID contain the superbucket number. Mask off the rest.
- return id.getRawId() & ~(UINT64_MAX << distribution_bits);
-}
-
-}
-
-bool
-BucketDBUpdater::MergingNodeRemover::distributorOwnsBucket(
- const document::BucketId& bucketId) const
-{
- // TODO "no distributors available" case is the same for _all_ buckets; cache once in constructor.
- // TODO "too few bits used" case can be cheaply checked without needing exception
- try {
- const auto bits = _state.getDistributionBitCount();
- const auto this_superbucket = superbucket_from_id(bucketId, bits);
- if (_cachedDecisionSuperbucket == this_superbucket) {
- if (!_cachedOwned) {
- logRemove(bucketId, "bucket now owned by another distributor (cached)");
- }
- return _cachedOwned;
- }
-
- uint16_t distributor = _distribution.getIdealDistributorNode(_state, bucketId, "uim");
- _cachedDecisionSuperbucket = this_superbucket;
- _cachedOwned = (distributor == _localIndex);
- if (!_cachedOwned) {
- logRemove(bucketId, "bucket now owned by another distributor");
- return false;
- }
- return true;
- } catch (lib::TooFewBucketBitsInUseException& exc) {
- logRemove(bucketId, "using too few distribution bits now");
- } catch (lib::NoDistributorsAvailableException& exc) {
- logRemove(bucketId, "no distributors are available");
- }
- return false;
-}
-
-void
-BucketDBUpdater::MergingNodeRemover::setCopiesInEntry(
- BucketDatabase::Entry& e,
- const std::vector<BucketCopy>& copies) const
-{
- e->clear();
-
- std::vector<uint16_t> order =
- _distribution.getIdealStorageNodes(_state, e.getBucketId(), _upStates);
-
- e->addNodes(copies, order);
-
- LOG(spam, "Changed %s", e->toString().c_str());
-}
-
-bool
-BucketDBUpdater::MergingNodeRemover::has_unavailable_nodes(const storage::BucketDatabase::Entry& e) const
-{
- const uint16_t n_nodes = e->getNodeCount();
- for (uint16_t i = 0; i < n_nodes; i++) {
- const uint16_t node_idx = e->getNodeRef(i).getNode();
- if (!storage_node_is_available(node_idx)) {
- return true;
- }
- }
- return false;
-}
-
-BucketDatabase::MergingProcessor::Result
-BucketDBUpdater::MergingNodeRemover::merge(storage::BucketDatabase::Merger& merger)
-{
- document::BucketId bucketId(merger.bucket_id());
- LOG(spam, "Check for remove: bucket %s", bucketId.toString().c_str());
- if (!distributorOwnsBucket(bucketId)) {
- // TODO remove in favor of DB snapshotting
- if (_track_non_owned_entries) {
- _nonOwnedBuckets.emplace_back(merger.current_entry());
- }
- return Result::Skip;
- }
- auto& e = merger.current_entry();
-
- if (e->getNodeCount() == 0) { // TODO when should this edge ever trigger?
- return Result::Skip;
- }
-
- if (!has_unavailable_nodes(e)) {
- return Result::KeepUnchanged;
- }
-
- std::vector<BucketCopy> remainingCopies;
- for (uint16_t i = 0; i < e->getNodeCount(); i++) {
- const uint16_t node_idx = e->getNodeRef(i).getNode();
- if (storage_node_is_available(node_idx)) {
- remainingCopies.push_back(e->getNodeRef(i));
- }
- }
-
- if (remainingCopies.empty()) {
- ++_removed_buckets;
- _removed_documents += e->getHighestDocumentCount();
- return Result::Skip;
- } else {
- setCopiesInEntry(e, remainingCopies);
- return Result::Update;
- }
-}
-
-bool
-BucketDBUpdater::MergingNodeRemover::storage_node_is_available(uint16_t index) const noexcept
-{
- return ((index < _available_nodes.size()) && _available_nodes[index]);
-}
-
-BucketDBUpdater::MergingNodeRemover::~MergingNodeRemover()
-{
- if (_removed_buckets != 0) {
- LOGBM(info, "After cluster state change %s, %zu buckets no longer "
- "have available replicas. %zu documents in these buckets will "
- "be unavailable until nodes come back up",
- _oldState.getTextualDifference(_state).c_str(),
- _removed_buckets, _removed_documents);
- }
-}
-
} // distributor
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 375a5cee4e7..1a4677ee507 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -26,8 +26,11 @@ class XmlAttribute;
namespace storage::distributor {
-class DistributorStripeInterface;
+class BucketSpaceDistributionConfigs;
class BucketSpaceDistributionContext;
+class DistributorStripeInterface;
+class StripeAccessor;
+class StripeAccessGuard;
class BucketDBUpdater : public framework::StatusReporter,
public api::MessageHandler
@@ -35,41 +38,31 @@ class BucketDBUpdater : public framework::StatusReporter,
public:
using OutdatedNodesMap = dbtransition::OutdatedNodesMap;
BucketDBUpdater(DistributorStripeInterface& owner,
- DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
DistributorMessageSender& sender,
- DistributorComponentRegister& compReg);
+ DistributorComponentRegister& comp_reg,
+ StripeAccessor& stripe_accessor);
~BucketDBUpdater() override;
void flush();
- const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const;
- void recheckBucketInfo(uint32_t nodeIdx, const document::Bucket& bucket);
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override;
bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) override;
bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply> & repl) override;
bool onMergeBucketReply(const std::shared_ptr<api::MergeBucketReply>& reply) override;
- bool onNotifyBucketChange(const std::shared_ptr<api::NotifyBucketChangeCommand>&) override;
- void resendDelayedMessages();
- void storageDistributionChanged();
- vespalib::string reportXmlStatus(vespalib::xml::XmlOutputStream&, const framework::HttpUrlPath&) const;
vespalib::string getReportContentType(const framework::HttpUrlPath&) const override;
bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override;
+
+ void resend_delayed_messages();
+ void storage_distribution_changed(const BucketSpaceDistributionConfigs& configs);
+ void bootstrap_distribution_config(std::shared_ptr<const lib::Distribution>);
+
+ vespalib::string report_xml_status(vespalib::xml::XmlOutputStream& xos, const framework::HttpUrlPath&) const;
+
void print(std::ostream& out, bool verbose, const std::string& indent) const;
const DistributorNodeContext& node_context() const { return _node_ctx; }
DistributorOperationContext& operation_context() { return _op_ctx; }
- /**
- * Returns whether the current PendingClusterState indicates that there has
- * been a transfer of bucket ownership amongst the distributors in the
- * cluster. This method only makes sense to call when _pendingClusterState
- * is active, such as from within a enableClusterState() call.
- */
- bool bucketOwnershipHasChanged() const {
- return ((_pendingClusterState.get() != nullptr)
- && _pendingClusterState->hasBucketOwnershipTransfer());
- }
void set_stale_reads_enabled(bool enabled) noexcept {
_stale_reads_enabled.store(enabled, std::memory_order_relaxed);
}
@@ -77,7 +70,6 @@ public:
return _stale_reads_enabled.load(std::memory_order_relaxed);
}
- OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const;
private:
class MergeReplyGuard {
public:
@@ -141,126 +133,78 @@ private:
// Transitively invokes Distributor::enableClusterStateBundle
void simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state);
- bool shouldDeferStateEnabling() const noexcept;
- bool hasPendingClusterState() const;
- bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
- bool processSingleBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
- void handleSingleBucketInfoFailure(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
- const BucketRequest& req);
- bool isPendingClusterStateCompleted() const;
- void processCompletedPendingClusterState();
- void activatePendingClusterState();
- void mergeBucketInfoWithDatabase(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
- const BucketRequest& req);
- void convertBucketInfoToBucketList(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
- uint16_t targetNode, BucketListMerger::BucketList& newList);
- void sendRequestBucketInfo(uint16_t node, const document::Bucket& bucket,
- const std::shared_ptr<MergeReplyGuard>& mergeReply);
- void addBucketInfoForNode(const BucketDatabase::Entry& e, uint16_t node,
- BucketListMerger::BucketList& existing) const;
- void ensureTransitionTimerStarted();
- void completeTransitionTimer();
- void clearReadOnlyBucketRepoDatabases();
+ bool should_defer_state_enabling() const noexcept;
+ bool has_pending_cluster_state() const;
+ bool pending_cluster_state_accepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
+ bool process_single_bucket_info_reply(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
+ void handle_single_bucket_info_failure(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ const BucketRequest& req);
+ bool is_pending_cluster_state_completed() const;
+ void process_completed_pending_cluster_state(StripeAccessGuard& guard);
+ void activate_pending_cluster_state(StripeAccessGuard& guard);
+ void merge_bucket_info_with_database(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ const BucketRequest& req);
+ void convert_bucket_info_to_bucket_list(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ uint16_t targetNode, BucketListMerger::BucketList& newList);
+ void send_request_bucket_info(uint16_t node, const document::Bucket& bucket,
+ const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard);
+ void add_bucket_info_for_node(const BucketDatabase::Entry& e, uint16_t node,
+ BucketListMerger::BucketList& existing) const;
+ void ensure_transition_timer_started();
+ void complete_transition_timer();
/**
* Adds all buckets contained in the bucket database
* that are either contained
* in bucketId, or that bucketId is contained in, that have copies
* on the given node.
*/
- void findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket,
- BucketListMerger::BucketList& existing);
+ void find_related_buckets_in_database(uint16_t node, const document::Bucket& bucket,
+ BucketListMerger::BucketList& existing);
/**
Updates the bucket database from the information generated by the given
bucket list merger.
*/
- void updateDatabase(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger);
+ void update_database(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger);
- void updateState(const lib::ClusterState& oldState, const lib::ClusterState& newState);
+ void remove_superfluous_buckets(StripeAccessGuard& guard,
+ const lib::ClusterStateBundle& new_state,
+ bool is_distribution_config_change);
- void update_read_snapshot_before_db_pruning();
- void removeSuperfluousBuckets(const lib::ClusterStateBundle& newState,
- bool is_distribution_config_change);
- void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state);
- void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state);
-
- void replyToPreviousPendingClusterStateIfAny();
- void replyToActivationWithActualVersion(
+ void reply_to_previous_pending_cluster_state_if_any();
+ void reply_to_activation_with_actual_version(
const api::ActivateClusterStateVersionCommand& cmd,
uint32_t actualVersion);
- void enableCurrentClusterStateBundleInDistributor();
- void addCurrentStateToClusterStateHistory();
- void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::Bucket&);
- void sendAllQueuedBucketRechecks();
+ void enable_current_cluster_state_bundle_in_distributor_and_stripes(StripeAccessGuard& guard);
+ void add_current_state_to_cluster_state_history();
+ void enqueue_recheck_until_pending_state_enabled(uint16_t node, const document::Bucket& bucket);
+ void send_all_queued_bucket_rechecks();
+
+ void propagate_active_state_bundle_internally();
void maybe_inject_simulated_db_pruning_delay();
void maybe_inject_simulated_db_merging_delay();
- /**
- Removes all copies of buckets that are on nodes that are down.
- */
- class MergingNodeRemover : public BucketDatabase::MergingProcessor {
- public:
- MergingNodeRemover(const lib::ClusterState& oldState,
- const lib::ClusterState& s,
- uint16_t localIndex,
- const lib::Distribution& distribution,
- const char* upStates,
- bool track_non_owned_entries);
- ~MergingNodeRemover() override;
-
- Result merge(BucketDatabase::Merger&) override;
- void logRemove(const document::BucketId& bucketId, const char* msg) const;
- bool distributorOwnsBucket(const document::BucketId&) const;
-
- const std::vector<BucketDatabase::Entry>& getNonOwnedEntries() const noexcept {
- return _nonOwnedBuckets;
- }
- private:
- void setCopiesInEntry(BucketDatabase::Entry& e, const std::vector<BucketCopy>& copies) const;
-
- bool has_unavailable_nodes(const BucketDatabase::Entry&) const;
- bool storage_node_is_available(uint16_t index) const noexcept;
-
- const lib::ClusterState _oldState;
- const lib::ClusterState _state;
- std::vector<bool> _available_nodes;
- std::vector<BucketDatabase::Entry> _nonOwnedBuckets;
- size_t _removed_buckets;
- size_t _removed_documents;
-
- uint16_t _localIndex;
- const lib::Distribution& _distribution;
- const char* _upStates;
- bool _track_non_owned_entries;
-
- mutable uint64_t _cachedDecisionSuperbucket;
- mutable bool _cachedOwned;
- };
+ // TODO STRIPE remove once distributor component dependencies have been pruned
+ StripeAccessor& _stripe_accessor;
+ lib::ClusterStateBundle _active_state_bundle;
+ std::unique_ptr<DistributorBucketSpaceRepo> _dummy_mutable_bucket_space_repo;
+ std::unique_ptr<DistributorBucketSpaceRepo> _dummy_read_only_bucket_space_repo;
- DistributorStripeComponent _distributorComponent;
+ DistributorStripeComponent _distributor_component;
const DistributorNodeContext& _node_ctx;
DistributorOperationContext& _op_ctx;
DistributorStripeInterface& _distributor_interface;
- std::deque<std::pair<framework::MilliSecTime, BucketRequest> > _delayedRequests;
- std::map<uint64_t, BucketRequest> _sentMessages;
- std::unique_ptr<PendingClusterState> _pendingClusterState;
+ std::deque<std::pair<framework::MilliSecTime, BucketRequest>> _delayed_requests;
+ std::map<uint64_t, BucketRequest> _sent_messages;
+ std::unique_ptr<PendingClusterState> _pending_cluster_state;
std::list<PendingClusterState::Summary> _history;
DistributorMessageSender& _sender;
- std::set<EnqueuedBucketRecheck> _enqueuedRechecks;
- OutdatedNodesMap _outdatedNodesMap;
- framework::MilliSecTimer _transitionTimer;
+ std::set<EnqueuedBucketRecheck> _enqueued_rechecks;
+ OutdatedNodesMap _outdated_nodes_map;
+ framework::MilliSecTimer _transition_timer;
std::atomic<bool> _stale_reads_enabled;
- using DistributionContexts = std::unordered_map<document::BucketSpace,
- std::shared_ptr<BucketSpaceDistributionContext>,
- document::BucketSpace::hash>;
- DistributionContexts _active_distribution_contexts;
- using DbGuards = std::unordered_map<document::BucketSpace,
- std::shared_ptr<BucketDatabase::ReadGuard>,
- document::BucketSpace::hash>;
- DbGuards _explicit_transition_read_guard;
- mutable std::mutex _distribution_context_mutex;
};
}
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index f886640d1f9..0f63de5cd85 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -1,12 +1,15 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
//
#include "blockingoperationstarter.h"
+#include "bucket_space_distribution_configs.h"
+#include "bucketdbupdater.h"
#include "distributor.h"
#include "distributor_bucket_space.h"
#include "distributor_status.h"
#include "distributor_stripe.h"
#include "distributormetricsset.h"
#include "idealstatemetricsset.h"
+#include "legacy_single_stripe_accessor.h"
#include "operation_sequencer.h"
#include "ownership_transfer_safe_time_point_calculator.h"
#include "throttlingoperationstarter.h"
@@ -47,16 +50,21 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
ChainedMessageSender* messageSender)
: StorageLink("distributor"),
framework::StatusReporter("distributor", "Distributor"),
+ _comp_reg(compReg),
_metrics(std::make_shared<DistributorMetricSet>()),
_messageSender(messageSender),
_stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, doneInitHandler,
manageActiveBucketCopies, *this)),
+ _stripe_accessor(std::make_unique<LegacySingleStripeAccessor>(*_stripe)),
_component(compReg, "distributor"),
+ _bucket_db_updater(),
_distributorStatusDelegate(compReg, *this, *this),
_threadPool(threadPool),
_tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN),
_metricUpdateHook(*this),
- _hostInfoReporter(*this, *this)
+ _hostInfoReporter(*this, *this),
+ _distribution(),
+ _next_distribution()
{
_component.registerMetric(*_metrics);
_component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0));
@@ -113,12 +121,12 @@ Distributor::distributor_component() noexcept {
return _stripe->_component;
}
-BucketDBUpdater&
+StripeBucketDBUpdater&
Distributor::bucket_db_updater() {
return _stripe->bucket_db_updater();
}
-const BucketDBUpdater&
+const StripeBucketDBUpdater&
Distributor::bucket_db_updater() const {
return _stripe->bucket_db_updater();
}
@@ -188,6 +196,9 @@ Distributor::onOpen()
void Distributor::onClose() {
LOG(debug, "Distributor::onClose invoked");
_stripe->close();
+ if (_bucket_db_updater) {
+ _bucket_db_updater->flush();
+ }
}
void
@@ -210,18 +221,47 @@ Distributor::sendDown(const std::shared_ptr<api::StorageMessage>& msg)
}
}
+namespace {
+
+bool should_be_handled_by_top_level_bucket_db_updater(const api::StorageMessage& msg) noexcept {
+ switch (msg.getType().getId()) {
+ case api::MessageType::SETSYSTEMSTATE_ID:
+ case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID:
+ return true;
+ case api::MessageType::REQUESTBUCKETINFO_REPLY_ID:
+ // Top-level component should only handle replies for full bucket info fetches.
+ // Bucket-specific requests should go to the stripes that sent them.
+ return dynamic_cast<const api::RequestBucketInfoReply&>(msg).full_bucket_fetch();
+ default:
+ return false;
+ }
+}
+
+}
+
bool
Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
{
+ // FIXME STRIPE this MUST be in a separate thread to enforce processing in a single thread
+ // regardless of what RPC thread (comm mgr, FRT...) this is called from!
+ if (_bucket_db_updater && should_be_handled_by_top_level_bucket_db_updater(*msg)) {
+ return msg->callHandler(*_bucket_db_updater, msg);
+ }
+ // TODO STRIPE can we route both requests and responses that are BucketCommand|Reply based on their bucket alone?
+ // that covers most operations already...
return _stripe->onDown(msg);
}
bool
Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply)
{
+ if (_bucket_db_updater && should_be_handled_by_top_level_bucket_db_updater(*reply)) {
+ return reply->callHandler(*_bucket_db_updater, reply);
+ }
return _stripe->handleReply(reply);
}
+// TODO STRIPE we need to reintroduce the top-level message queue...
bool
Distributor::handleMessage(const std::shared_ptr<api::StorageMessage>& msg)
{
@@ -245,21 +285,42 @@ Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state)
void
Distributor::storageDistributionChanged()
{
- // May happen from any thread.
- _stripe->storageDistributionChanged();
+ if (_bucket_db_updater) {
+ if (!_distribution || (*_component.getDistribution() != *_distribution)) {
+ LOG(debug, "Distribution changed to %s, must re-fetch bucket information",
+ _component.getDistribution()->toString().c_str());
+ _next_distribution = _component.getDistribution(); // FIXME this is not thread safe
+ } else {
+ LOG(debug, "Got distribution change, but the distribution %s was the same as before: %s",
+ _component.getDistribution()->toString().c_str(),
+ _distribution->toString().c_str());
+ }
+ } else {
+ // May happen from any thread.
+ _stripe->storageDistributionChanged();
+ }
}
void
Distributor::enableNextDistribution()
{
- _stripe->enableNextDistribution();
+ if (_next_distribution && _bucket_db_updater) {
+ _distribution = _next_distribution;
+ _next_distribution = std::shared_ptr<lib::Distribution>();
+ auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(_distribution);
+ _bucket_db_updater->storage_distribution_changed(new_configs);
+ } else {
+ _stripe->enableNextDistribution();
+ }
}
// TODO STRIPE only used by tests to directly inject new distribution config
+// - actually, also by ctor
void
Distributor::propagateDefaultDistribution(
std::shared_ptr<const lib::Distribution> distribution)
{
+ // TODO STRIPE top-level bucket DB updater
_stripe->propagateDefaultDistribution(std::move(distribution));
}
@@ -299,6 +360,9 @@ framework::ThreadWaitInfo
Distributor::doCriticalTick(framework::ThreadIndex idx)
{
_tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
+ if (_bucket_db_updater) {
+ enableNextDistribution();
+ }
// Propagates any new configs down to stripe(s)
enableNextConfig();
_stripe->doCriticalTick(idx);
@@ -309,6 +373,9 @@ Distributor::doCriticalTick(framework::ThreadIndex idx)
framework::ThreadWaitInfo
Distributor::doNonCriticalTick(framework::ThreadIndex idx)
{
+ if (_bucket_db_updater) {
+ _bucket_db_updater->resend_delayed_messages();
+ }
// TODO STRIPE stripes need their own thread loops!
_stripe->doNonCriticalTick(idx);
_tickResult = _stripe->_tickResult;
@@ -318,6 +385,11 @@ Distributor::doNonCriticalTick(framework::ThreadIndex idx)
void
Distributor::enableNextConfig()
{
+ // FIXME STRIPE enforce this cannot happen live, only valid for startup config edge
+ if (getConfig().num_distributor_stripes() > 0 && !_bucket_db_updater) {
+ // FIXME STRIPE using the singular stripe here is a temporary Hack McHack Deluxe 3000!
+ _bucket_db_updater = std::make_unique<BucketDBUpdater>(*_stripe, *_stripe, _comp_reg, *_stripe_accessor);
+ }
_hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting());
_stripe->enableNextConfig(); // TODO STRIPE avoid redundant call
}
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index bfffe126b44..9ad6425afe9 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -11,6 +11,7 @@
#include "min_replica_provider.h"
#include "pendingmessagetracker.h"
#include "statusreporterdelegate.h"
+#include "stripe_bucket_db_updater.h" // TODO this is temporary
#include <vespa/config/config.h>
#include <vespa/storage/common/distributorcomponent.h>
#include <vespa/storage/common/doneinitializehandler.h>
@@ -33,10 +34,12 @@ namespace storage::distributor {
class BlockingOperationStarter;
class BucketPriorityDatabase;
+class BucketDBUpdater;
class DistributorBucketSpaceRepo;
class DistributorStatus;
class DistributorStripe;
class OperationSequencer;
+class LegacySingleStripeAccessor;
class OwnershipTransferSafeTimePointCalculator;
class SimpleMaintenanceScanner;
class ThrottlingOperationStarter;
@@ -135,8 +138,8 @@ private:
bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg);
// Accessors used by tests
- BucketDBUpdater& bucket_db_updater();
- const BucketDBUpdater& bucket_db_updater() const;
+ StripeBucketDBUpdater& bucket_db_updater();
+ const StripeBucketDBUpdater& bucket_db_updater() const;
IdealStateManager& ideal_state_manager();
const IdealStateManager& ideal_state_manager() const;
ExternalOperationHandler& external_operation_handler();
@@ -162,16 +165,22 @@ private:
void enableNextDistribution();
void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>);
+ DistributorComponentRegister& _comp_reg;
std::shared_ptr<DistributorMetricSet> _metrics;
ChainedMessageSender* _messageSender;
// TODO STRIPE multiple stripes...! This is for proof of concept of wiring.
std::unique_ptr<DistributorStripe> _stripe;
+ std::unique_ptr<LegacySingleStripeAccessor> _stripe_accessor;
storage::DistributorComponent _component;
+ std::unique_ptr<BucketDBUpdater> _bucket_db_updater;
StatusReporterDelegate _distributorStatusDelegate;
framework::TickingThreadPool& _threadPool;
framework::ThreadWaitInfo _tickResult;
MetricUpdateHook _metricUpdateHook;
DistributorHostInfoReporter _hostInfoReporter;
+
+ std::shared_ptr<lib::Distribution> _distribution;
+ std::shared_ptr<lib::Distribution> _next_distribution;
};
}
diff --git a/storage/src/vespa/storage/distributor/distributor_bucket_space.h b/storage/src/vespa/storage/distributor/distributor_bucket_space.h
index f4d5bd6f5aa..558cbada31f 100644
--- a/storage/src/vespa/storage/distributor/distributor_bucket_space.h
+++ b/storage/src/vespa/storage/distributor/distributor_bucket_space.h
@@ -80,6 +80,7 @@ public:
}
void set_pending_cluster_state(std::shared_ptr<const lib::ClusterState> pending_cluster_state);
+ bool has_pending_cluster_state() const noexcept { return static_cast<bool>(_pending_cluster_state); }
const lib::ClusterState& get_pending_cluster_state() const noexcept { return *_pending_cluster_state; }
/**
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index e41c7940a0d..906129ebf96 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -478,6 +478,8 @@ DistributorStripe::checkBucketForSplit(document::BucketSpace bucketSpace,
}
}
+// TODO STRIPE must only be called when operating in legacy single stripe mode!
+// In other cases, distribution config switching is controlled by top-level distributor, not via framework(tm).
void
DistributorStripe::enableNextDistribution()
{
@@ -485,10 +487,12 @@ DistributorStripe::enableNextDistribution()
_distribution = _nextDistribution;
propagateDefaultDistribution(_distribution);
_nextDistribution = std::shared_ptr<lib::Distribution>();
+ // TODO conditional on whether top-level DB updater is in charge
_bucketDBUpdater.storageDistributionChanged();
}
}
+// TODO STRIPE must be invoked by top-level bucket db updater probably
void
DistributorStripe::propagateDefaultDistribution(
std::shared_ptr<const lib::Distribution> distribution)
@@ -500,6 +504,19 @@ DistributorStripe::propagateDefaultDistribution(
}
}
+// Only called when stripe is in rendezvous freeze
+void
+DistributorStripe::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) {
+ auto default_distr = new_configs.get_or_nullptr(document::FixedBucketSpaces::default_space());
+ auto global_distr = new_configs.get_or_nullptr(document::FixedBucketSpaces::global_space());
+ assert(default_distr && global_distr);
+
+ for (auto* repo : {_bucketSpaceRepo.get(), _readOnlyBucketSpaceRepo.get()}) {
+ repo->get(document::FixedBucketSpaces::default_space()).setDistribution(default_distr);
+ repo->get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr);
+ }
+}
+
void
DistributorStripe::propagateClusterStates()
{
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index 10b3f54d834..4a43b93354e 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -3,7 +3,6 @@
#pragma once
#include "bucket_spaces_stats_provider.h"
-#include "bucketdbupdater.h"
#include "distributor_host_info_reporter.h"
#include "distributor_stripe_interface.h"
#include "externaloperationhandler.h"
@@ -11,6 +10,8 @@
#include "min_replica_provider.h"
#include "pendingmessagetracker.h"
#include "statusreporterdelegate.h"
+#include "stripe_access_guard.h"
+#include "stripe_bucket_db_updater.h"
#include <vespa/config/config.h>
#include <vespa/storage/common/doneinitializehandler.h>
#include <vespa/storage/common/messagesender.h>
@@ -98,7 +99,7 @@ public:
/**
* Invoked when a pending cluster state for a distribution (config)
- * change has been enabled. An invocation of storageDistributionChanged
+ * change has been enabled. An invocation of storage_distribution_changed
* will eventually cause this method to be called, assuming the pending
* cluster state completed successfully.
*/
@@ -169,8 +170,8 @@ public:
return *_bucketIdHasher;
}
- BucketDBUpdater& bucket_db_updater() { return _bucketDBUpdater; }
- const BucketDBUpdater& bucket_db_updater() const { return _bucketDBUpdater; }
+ StripeBucketDBUpdater& bucket_db_updater() { return _bucketDBUpdater; }
+ const StripeBucketDBUpdater& bucket_db_updater() const { return _bucketDBUpdater; }
IdealStateManager& ideal_state_manager() { return _idealStateManager; }
const IdealStateManager& ideal_state_manager() const { return _idealStateManager; }
ExternalOperationHandler& external_operation_handler() { return _externalOperationHandler; }
@@ -198,6 +199,7 @@ private:
friend class DistributorTestUtil;
friend class MetricUpdateHook;
friend class Distributor;
+ friend class LegacySingleStripeAccessGuard;
bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg);
bool isMaintenanceReply(const api::StorageReply& reply) const;
@@ -251,9 +253,10 @@ private:
bool generateOperation(const std::shared_ptr<api::StorageMessage>& msg,
Operation::SP& operation);
- void enableNextDistribution();
- void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>);
+ void enableNextDistribution(); // TODO STRIPE remove once legacy is gone
+ void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>); // TODO STRIPE remove once legacy is gone
void propagateClusterStates();
+ void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs);
BucketSpacesStatsProvider::BucketSpacesStats make_invalid_stats_per_configured_space() const;
template <typename NodeFunctor>
@@ -276,7 +279,7 @@ private:
std::unique_ptr<OperationSequencer> _operation_sequencer;
PendingMessageTracker _pendingMessageTracker;
- BucketDBUpdater _bucketDBUpdater;
+ StripeBucketDBUpdater _bucketDBUpdater;
StatusReporterDelegate _distributorStatusDelegate;
StatusReporterDelegate _bucketDBStatusDelegate;
IdealStateManager _idealStateManager;
diff --git a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp
new file mode 100644
index 00000000000..04f210205c3
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp
@@ -0,0 +1,88 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "legacy_single_stripe_accessor.h"
+#include "distributor_stripe.h"
+
+namespace storage::distributor {
+
+LegacySingleStripeAccessGuard::LegacySingleStripeAccessGuard(LegacySingleStripeAccessor& accessor,
+ DistributorStripe& stripe)
+ : _accessor(accessor),
+ _stripe(stripe)
+{}
+
+LegacySingleStripeAccessGuard::~LegacySingleStripeAccessGuard() {
+ _accessor.mark_guard_released();
+}
+
+void LegacySingleStripeAccessGuard::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) {
+ _stripe.update_distribution_config(new_configs);
+}
+
+void LegacySingleStripeAccessGuard::set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) {
+ _stripe.getBucketSpaceRepo().set_pending_cluster_state_bundle(pending_state);
+ // TODO STRIPE also read only repo?
+}
+
+void LegacySingleStripeAccessGuard::clear_pending_cluster_state_bundle() {
+ _stripe.getBucketSpaceRepo().clear_pending_cluster_state_bundle();
+ // TODO STRIPE also read only repo?
+}
+
+void LegacySingleStripeAccessGuard::enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) {
+ _stripe.enableClusterStateBundle(new_state);
+}
+
+void LegacySingleStripeAccessGuard::notify_distribution_change_enabled() {
+ _stripe.notifyDistributionChangeEnabled();
+}
+
+PotentialDataLossReport
+LegacySingleStripeAccessGuard::remove_superfluous_buckets(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change)
+{
+ return _stripe.bucket_db_updater().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change);
+}
+
+void
+LegacySingleStripeAccessGuard::merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries)
+{
+ _stripe.bucket_db_updater().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution,
+ new_state, storage_up_states, outdated_nodes, entries);
+}
+
+void LegacySingleStripeAccessGuard::update_read_snapshot_before_db_pruning() {
+ _stripe.bucket_db_updater().update_read_snapshot_before_db_pruning();
+}
+
+void LegacySingleStripeAccessGuard::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) {
+ _stripe.bucket_db_updater().update_read_snapshot_after_db_pruning(new_state);
+}
+
+void LegacySingleStripeAccessGuard::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) {
+ _stripe.bucket_db_updater().update_read_snapshot_after_activation(activated_state);
+}
+
+void LegacySingleStripeAccessGuard::clear_read_only_bucket_repo_databases() {
+ _stripe.bucket_db_updater().clearReadOnlyBucketRepoDatabases();
+}
+
+std::unique_ptr<StripeAccessGuard> LegacySingleStripeAccessor::rendezvous_and_hold_all() {
+ // For sanity checking during development.
+ assert(!_guard_held);
+ _guard_held = true;
+ return std::make_unique<LegacySingleStripeAccessGuard>(*this, _stripe);
+}
+
+void LegacySingleStripeAccessor::mark_guard_released() {
+ assert(_guard_held);
+ _guard_held = false;
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h
new file mode 100644
index 00000000000..4f8cceb0a70
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h
@@ -0,0 +1,67 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "stripe_access_guard.h"
+
+namespace storage::distributor {
+
+class DistributorStripe;
+class LegacySingleStripeAccessor;
+
+/**
+ * Very simple stripe access guard which expects the caller and its single stripe to run in the
+ * same thread. This means there's no actual striping of operations or any thread synchronization
+ * performed. Only intended as a stop-gap while we have legacy stripe behavior.
+ */
+class LegacySingleStripeAccessGuard : public StripeAccessGuard {
+ LegacySingleStripeAccessor& _accessor;
+ DistributorStripe& _stripe;
+public:
+ LegacySingleStripeAccessGuard(LegacySingleStripeAccessor& accessor,
+ DistributorStripe& stripe);
+ ~LegacySingleStripeAccessGuard() override;
+
+ void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override;
+ void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) override;
+ void clear_pending_cluster_state_bundle() override;
+ void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) override;
+ void notify_distribution_change_enabled() override;
+
+ PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change) override;
+ void merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries) override;
+
+ void update_read_snapshot_before_db_pruning() override;
+ void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) override;
+ void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) override;
+ void clear_read_only_bucket_repo_databases() override;
+};
+
+/**
+ * Impl of StripeAccessor which creates LegacySingleStripeAccessGuards bound to a single stripe.
+ */
+class LegacySingleStripeAccessor : public StripeAccessor {
+ DistributorStripe& _stripe;
+ bool _guard_held;
+
+ friend class LegacySingleStripeAccessGuard;
+public:
+ explicit LegacySingleStripeAccessor(DistributorStripe& stripe)
+ : _stripe(stripe),
+ _guard_held(false)
+ {}
+ ~LegacySingleStripeAccessor() override = default;
+
+ std::unique_ptr<StripeAccessGuard> rendezvous_and_hold_all() override;
+private:
+ void mark_guard_released();
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
index af6223afe28..8e54cfcb27d 100644
--- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
@@ -4,6 +4,7 @@
#include "clusterinformation.h"
#include "pendingclusterstate.h"
#include "distributor_bucket_space.h"
+#include "stripe_access_guard.h"
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <algorithm>
@@ -18,13 +19,15 @@ using lib::NodeType;
using lib::NodeState;
PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(const PendingClusterState &pendingClusterState,
+ document::BucketSpace bucket_space,
DistributorBucketSpace &distributorBucketSpace,
bool distributionChanged,
const OutdatedNodes &outdatedNodes,
std::shared_ptr<const ClusterInformation> clusterInfo,
const lib::ClusterState &newClusterState,
api::Timestamp creationTimestamp)
- : _entries(),
+ : _bucket_space(bucket_space),
+ _entries(),
_iter(0),
_removedBuckets(),
_missingEntries(),
@@ -53,7 +56,7 @@ PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(const PendingClus
PendingBucketSpaceDbTransition::~PendingBucketSpaceDbTransition() = default;
PendingBucketSpaceDbTransition::Range
-PendingBucketSpaceDbTransition::skipAllForSameBucket()
+PendingBucketSpaceDbTransition::DbMerger::skipAllForSameBucket()
{
Range r(_iter, _iter);
@@ -68,7 +71,7 @@ PendingBucketSpaceDbTransition::skipAllForSameBucket()
}
std::vector<BucketCopy>
-PendingBucketSpaceDbTransition::getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range)
+PendingBucketSpaceDbTransition::DbMerger::getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range)
{
std::vector<BucketCopy> copiesToAdd;
for (uint32_t i = range.first; i < range.second; ++i) {
@@ -83,28 +86,21 @@ PendingBucketSpaceDbTransition::getCopiesThatAreNewOrAltered(BucketDatabase::Ent
}
void
-PendingBucketSpaceDbTransition::insertInfo(BucketDatabase::Entry& info, const Range& range)
+PendingBucketSpaceDbTransition::DbMerger::insertInfo(BucketDatabase::Entry& info, const Range& range)
{
std::vector<BucketCopy> copiesToAddOrUpdate(
getCopiesThatAreNewOrAltered(info, range));
- const auto &dist(_distributorBucketSpace.getDistribution());
std::vector<uint16_t> order(
- dist.getIdealStorageNodes(
- _newClusterState,
+ _distribution.getIdealStorageNodes(
+ _new_state,
_entries[range.first].bucket_id(),
- _clusterInfo->getStorageUpStates()));
+ _storage_up_states));
info->addNodes(copiesToAddOrUpdate, order, TrustedUpdate::DEFER);
}
-std::string
-PendingBucketSpaceDbTransition::requestNodesToString()
-{
- return _pendingClusterState.requestNodesToString();
-}
-
bool
-PendingBucketSpaceDbTransition::removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId)
+PendingBucketSpaceDbTransition::DbMerger::removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId)
{
bool updated = false;
for (uint32_t i = 0; i < e->getNodeCount();) {
@@ -116,7 +112,7 @@ PendingBucketSpaceDbTransition::removeCopiesFromNodesThatWereRequested(BucketDat
// mark a single remaining replica as trusted even though there might
// be one or more additional replicas pending merge into the database.
if (nodeIsOutdated(entryNode)
- && (info.getTimestamp() < _creationTimestamp)
+ && (info.getTimestamp() < _creation_timestamp)
&& e->removeNode(entryNode, TrustedUpdate::DEFER))
{
LOG(spam,
@@ -133,21 +129,21 @@ PendingBucketSpaceDbTransition::removeCopiesFromNodesThatWereRequested(BucketDat
}
bool
-PendingBucketSpaceDbTransition::databaseIteratorHasPassedBucketInfoIterator(uint64_t bucket_key) const
+PendingBucketSpaceDbTransition::DbMerger::databaseIteratorHasPassedBucketInfoIterator(uint64_t bucket_key) const
{
return ((_iter < _entries.size())
&& (_entries[_iter].bucket_key < bucket_key));
}
bool
-PendingBucketSpaceDbTransition::bucketInfoIteratorPointsToBucket(uint64_t bucket_key) const
+PendingBucketSpaceDbTransition::DbMerger::bucketInfoIteratorPointsToBucket(uint64_t bucket_key) const
{
return _iter < _entries.size() && _entries[_iter].bucket_key == bucket_key;
}
using MergeResult = BucketDatabase::MergingProcessor::Result;
-MergeResult PendingBucketSpaceDbTransition::merge(BucketDatabase::Merger& merger) {
+MergeResult PendingBucketSpaceDbTransition::DbMerger::merge(BucketDatabase::Merger& merger) {
const uint64_t bucket_key = merger.bucket_key();
while (databaseIteratorHasPassedBucketInfoIterator(bucket_key)) {
@@ -158,9 +154,7 @@ MergeResult PendingBucketSpaceDbTransition::merge(BucketDatabase::Merger& merger
auto& e = merger.current_entry();
document::BucketId bucketId(e.getBucketId());
- LOG(spam,
- "Before merging info from nodes [%s], bucket %s had info %s",
- requestNodesToString().c_str(),
+ LOG(spam, "Before merging info, bucket %s had info %s",
bucketId.toString().c_str(),
e.getBucketInfo().toString().c_str());
@@ -185,14 +179,14 @@ MergeResult PendingBucketSpaceDbTransition::merge(BucketDatabase::Merger& merger
return MergeResult::KeepUnchanged;
}
-void PendingBucketSpaceDbTransition::insert_remaining_at_end(BucketDatabase::TrailingInserter& inserter) {
+void PendingBucketSpaceDbTransition::DbMerger::insert_remaining_at_end(BucketDatabase::TrailingInserter& inserter) {
while (_iter < _entries.size()) {
addToInserter(inserter, skipAllForSameBucket());
}
}
void
-PendingBucketSpaceDbTransition::addToMerger(BucketDatabase::Merger& merger, const Range& range)
+PendingBucketSpaceDbTransition::DbMerger::addToMerger(BucketDatabase::Merger& merger, const Range& range)
{
const auto bucket_id = _entries[range.first].bucket_id();
LOG(spam, "Adding new bucket %s with %d copies",
@@ -202,16 +196,14 @@ PendingBucketSpaceDbTransition::addToMerger(BucketDatabase::Merger& merger, cons
BucketDatabase::Entry e(bucket_id, BucketInfo());
insertInfo(e, range);
if (e->getLastGarbageCollectionTime() == 0) {
- e->setLastGarbageCollectionTime(
- framework::MicroSecTime(_creationTimestamp)
- .getSeconds().getTime());
+ e->setLastGarbageCollectionTime(framework::MicroSecTime(_creation_timestamp).getSeconds().getTime());
}
e.getBucketInfo().updateTrusted();
merger.insert_before_current(bucket_id, e);
}
void
-PendingBucketSpaceDbTransition::addToInserter(BucketDatabase::TrailingInserter& inserter, const Range& range)
+PendingBucketSpaceDbTransition::DbMerger::addToInserter(BucketDatabase::TrailingInserter& inserter, const Range& range)
{
// TODO dedupe
const auto bucket_id = _entries[range.first].bucket_id();
@@ -222,20 +214,32 @@ PendingBucketSpaceDbTransition::addToInserter(BucketDatabase::TrailingInserter&
BucketDatabase::Entry e(bucket_id, BucketInfo());
insertInfo(e, range);
if (e->getLastGarbageCollectionTime() == 0) {
- e->setLastGarbageCollectionTime(
- framework::MicroSecTime(_creationTimestamp)
- .getSeconds().getTime());
+ e->setLastGarbageCollectionTime(framework::MicroSecTime(_creation_timestamp).getSeconds().getTime());
}
e.getBucketInfo().updateTrusted();
inserter.insert_at_end(bucket_id, e);
}
+// TODO STRIPE remove legacy single stripe stuff
void
PendingBucketSpaceDbTransition::mergeIntoBucketDatabase()
{
BucketDatabase &db(_distributorBucketSpace.getBucketDatabase());
std::sort(_entries.begin(), _entries.end());
- db.merge(*this);
+
+ const auto& dist = _distributorBucketSpace.getDistribution();
+ DbMerger merger(_creationTimestamp, dist, _newClusterState, _clusterInfo->getStorageUpStates(), _outdatedNodes, _entries);
+
+ db.merge(merger);
+}
+
+void
+PendingBucketSpaceDbTransition::merge_into_bucket_databases(StripeAccessGuard& guard)
+{
+ std::sort(_entries.begin(), _entries.end());
+ const auto& dist = _distributorBucketSpace.getDistribution();
+ guard.merge_entries_into_db(_bucket_space, _creationTimestamp, dist, _newClusterState,
+ _clusterInfo->getStorageUpStates(), _outdatedNodes, _entries);
}
void
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
index 232f1186879..86d2c89bf23 100644
--- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
@@ -3,25 +3,30 @@
#include "pending_bucket_space_db_transition_entry.h"
#include "outdated_nodes.h"
+#include <vespa/document/bucket/bucketspace.h>
#include <vespa/storage/bucketdb/bucketdatabase.h>
#include <unordered_map>
namespace storage::api { class RequestBucketInfoReply; }
-namespace storage::lib { class ClusterState; class State; }
+namespace storage::lib {
+class ClusterState;
+class Distribution;
+class State;
+}
namespace storage::distributor {
class ClusterInformation;
class PendingClusterState;
class DistributorBucketSpace;
+class StripeAccessGuard;
/**
* Class used by PendingClusterState to track request bucket info
* reply result within a bucket space and apply it to the distributor
* bucket database when switching to the pending cluster state.
*/
-class PendingBucketSpaceDbTransition : public BucketDatabase::MergingProcessor
-{
+class PendingBucketSpaceDbTransition {
public:
using Entry = dbtransition::Entry;
using EntryList = std::vector<Entry>;
@@ -29,6 +34,7 @@ public:
private:
using Range = std::pair<uint32_t, uint32_t>;
+ document::BucketSpace _bucket_space;
EntryList _entries;
uint32_t _iter;
std::vector<document::BucketId> _removedBuckets;
@@ -42,45 +48,16 @@ private:
// may be down and thus cannot get a request.
OutdatedNodes _outdatedNodes;
- const lib::ClusterState &_prevClusterState;
- const lib::ClusterState &_newClusterState;
+ const lib::ClusterState& _prevClusterState;
+ const lib::ClusterState& _newClusterState;
const api::Timestamp _creationTimestamp;
- const PendingClusterState &_pendingClusterState;
- DistributorBucketSpace &_distributorBucketSpace;
+ const PendingClusterState& _pendingClusterState;
+ DistributorBucketSpace& _distributorBucketSpace;
uint16_t _distributorIndex;
bool _bucketOwnershipTransfer;
std::unordered_map<uint16_t, size_t> _rejectedRequests;
std::unordered_map<uint16_t, size_t> _failed_requests; // Also includes rejections
- BucketDatabase::MergingProcessor::Result merge(BucketDatabase::Merger&) override;
- void insert_remaining_at_end(BucketDatabase::TrailingInserter&) override;
-
- /**
- * Skips through all entries for the same bucket and returns
- * the range in the entry list for which they were found.
- * The range is [from, to>
- */
- Range skipAllForSameBucket();
-
- std::vector<BucketCopy> getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range);
- void insertInfo(BucketDatabase::Entry& info, const Range& range);
- void addToMerger(BucketDatabase::Merger& merger, const Range& range);
- void addToInserter(BucketDatabase::TrailingInserter& inserter, const Range& range);
-
- bool nodeIsOutdated(uint16_t node) const {
- return (_outdatedNodes.find(node) != _outdatedNodes.end());
- }
-
- // Returns whether at least one replica was removed from the entry.
- // Does NOT implicitly update trusted status on remaining replicas; caller must do
- // this explicitly.
- bool removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId);
-
- // Helper methods for iterating over _entries
- bool databaseIteratorHasPassedBucketInfoIterator(uint64_t bucket_key) const;
- bool bucketInfoIteratorPointsToBucket(uint64_t bucket_key) const;
- std::string requestNodesToString();
-
bool distributorChanged();
static bool nodeWasUpButNowIsDown(const lib::State &old, const lib::State &nw);
bool storageNodeUpInNewState(uint16_t node) const;
@@ -94,17 +71,75 @@ private:
void updateSetOfNodesThatAreOutdated();
public:
+ // Abstracts away the details of how an entry list gathered from content nodes
+ // is actually diffed and merged into a database.
+ class DbMerger : public BucketDatabase::MergingProcessor {
+ api::Timestamp _creation_timestamp;
+ const lib::Distribution& _distribution;
+ const lib::ClusterState& _new_state;
+ const char* _storage_up_states;
+ const std::unordered_set<uint16_t>& _outdated_nodes; // TODO hash_set
+ const std::vector<dbtransition::Entry>& _entries;
+ uint32_t _iter;
+ public:
+ DbMerger(api::Timestamp creation_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries)
+ : _creation_timestamp(creation_timestamp),
+ _distribution(distribution),
+ _new_state(new_state),
+ _storage_up_states(storage_up_states),
+ _outdated_nodes(outdated_nodes),
+ _entries(entries),
+ _iter(0)
+ {}
+ ~DbMerger() override = default;
+
+ BucketDatabase::MergingProcessor::Result merge(BucketDatabase::Merger&) override;
+ void insert_remaining_at_end(BucketDatabase::TrailingInserter&) override;
+
+ /**
+ * Skips through all entries for the same bucket and returns
+ * the range in the entry list for which they were found.
+ * The range is [from, to>
+ */
+ Range skipAllForSameBucket();
+
+ std::vector<BucketCopy> getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range);
+ void insertInfo(BucketDatabase::Entry& info, const Range& range);
+ void addToMerger(BucketDatabase::Merger& merger, const Range& range);
+ void addToInserter(BucketDatabase::TrailingInserter& inserter, const Range& range);
+
+ // Returns whether at least one replica was removed from the entry.
+ // Does NOT implicitly update trusted status on remaining replicas; caller must do
+ // this explicitly.
+ bool removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId);
+
+ // Helper methods for iterating over _entries
+ bool databaseIteratorHasPassedBucketInfoIterator(uint64_t bucket_key) const;
+ bool bucketInfoIteratorPointsToBucket(uint64_t bucket_key) const;
+
+ bool nodeIsOutdated(uint16_t node) const {
+ return (_outdated_nodes.find(node) != _outdated_nodes.end());
+ }
+ };
+
PendingBucketSpaceDbTransition(const PendingClusterState &pendingClusterState,
+ document::BucketSpace bucket_space,
DistributorBucketSpace &distributorBucketSpace,
bool distributionChanged,
const OutdatedNodes &outdatedNodes,
std::shared_ptr<const ClusterInformation> clusterInfo,
const lib::ClusterState &newClusterState,
api::Timestamp creationTimestamp);
- ~PendingBucketSpaceDbTransition() override;
+ ~PendingBucketSpaceDbTransition();
// Merges all the results with the corresponding bucket database.
void mergeIntoBucketDatabase();
+ void merge_into_bucket_databases(StripeAccessGuard& guard);
// Adds the info from the reply to our list of information.
void onRequestBucketInfoReply(const api::RequestBucketInfoReply &reply, uint16_t node);
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index a7fd5a5af53..9e811ea9d23 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -84,9 +84,9 @@ PendingClusterState::initializeBucketSpaceTransitions(bool distributionChanged,
auto onItr = outdatedNodesMap.find(elem.first);
const auto &outdatedNodes = (onItr == outdatedNodesMap.end()) ? emptyOutdatedNodes : onItr->second;
auto pendingTransition =
- std::make_unique<PendingBucketSpaceDbTransition>
- (*this, *elem.second, distributionChanged, outdatedNodes,
- _clusterInfo, *_newClusterStateBundle.getDerivedClusterState(elem.first), _creationTimestamp);
+ std::make_unique<PendingBucketSpaceDbTransition>(
+ *this, elem.first, *elem.second, distributionChanged, outdatedNodes,
+ _clusterInfo, *_newClusterStateBundle.getDerivedClusterState(elem.first), _creationTimestamp);
if (pendingTransition->getBucketOwnershipTransfer()) {
_bucketOwnershipTransfer = true;
}
@@ -331,6 +331,14 @@ PendingClusterState::mergeIntoBucketDatabases()
}
void
+PendingClusterState::merge_into_bucket_databases(StripeAccessGuard& guard)
+{
+ for (auto &elem : _pendingTransitions) {
+ elem.second->merge_into_bucket_databases(guard);
+ }
+}
+
+void
PendingClusterState::printXml(vespalib::XmlOutputStream& xos) const
{
using namespace vespalib::xml;
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h
index 42b7bf0dcf2..af0c85fab95 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.h
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h
@@ -18,6 +18,7 @@ namespace storage::distributor {
class DistributorMessageSender;
class PendingBucketSpaceDbTransition;
class DistributorBucketSpaceRepo;
+class StripeAccessGuard;
/**
* Class used by BucketDBUpdater to track request bucket info
@@ -146,6 +147,8 @@ public:
* Merges all the results with the corresponding bucket databases.
*/
void mergeIntoBucketDatabases();
+ void merge_into_bucket_databases(StripeAccessGuard& guard);
+
// Get pending transition for a specific bucket space. Only used by unit test.
PendingBucketSpaceDbTransition &getPendingBucketSpaceDbTransition(document::BucketSpace bucketSpace);
diff --git a/storage/src/vespa/storage/distributor/potential_data_loss_report.h b/storage/src/vespa/storage/distributor/potential_data_loss_report.h
new file mode 100644
index 00000000000..96abd787649
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/potential_data_loss_report.h
@@ -0,0 +1,22 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <cstdint>
+
+namespace storage::distributor {
+
+/**
+ * Represents the amount of data a distributor reasons _may_ have become unavailable
+ * due to all bucket replicas no longer being present.
+ */
+struct PotentialDataLossReport {
+ size_t buckets = 0;
+ size_t documents = 0;
+
+ void merge(const PotentialDataLossReport& other) noexcept {
+ buckets += other.buckets;
+ documents += other.documents;
+ }
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/stripe_access_guard.h b/storage/src/vespa/storage/distributor/stripe_access_guard.h
new file mode 100644
index 00000000000..8d848450cd2
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/stripe_access_guard.h
@@ -0,0 +1,68 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "bucket_space_distribution_configs.h"
+#include "pending_bucket_space_db_transition_entry.h"
+#include "potential_data_loss_report.h"
+#include <vespa/document/bucket/bucketspace.h>
+#include <vespa/storageapi/defs.h>
+#include <unordered_set> // TODO use hash_set instead
+
+namespace storage::lib {
+class ClusterState;
+class ClusterStateBundle;
+class Distribution;
+}
+
+namespace storage::distributor {
+
+
+/**
+ * A stripe access guard guarantees that the holder of a guard can access underlying
+ * stripes via it in a thread safe manner. In particular, while any access guard is
+ * held, all stripe threads must be in a safe rendezvous location with no race conditions
+ * possible. When a guard goes out of scope, the stripe threads may resume operation.
+ */
+class StripeAccessGuard {
+public:
+ virtual ~StripeAccessGuard() = default;
+
+ virtual void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) = 0;
+ virtual void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) = 0;
+ virtual void clear_pending_cluster_state_bundle() = 0;
+ virtual void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) = 0;
+ virtual void notify_distribution_change_enabled() = 0;
+
+ virtual PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change) = 0;
+ virtual void merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries) = 0;
+
+ virtual void update_read_snapshot_before_db_pruning() = 0;
+ virtual void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) = 0;
+ virtual void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) = 0;
+ virtual void clear_read_only_bucket_repo_databases() = 0;
+
+};
+
+/**
+ * Provides a factory for guards that protect access to underlying stripes.
+ *
+ * Important: at most one StripeAccessorGuard may exist at any given time. Creating
+ * concurrent guards is undefined behavior.
+ */
+class StripeAccessor {
+public:
+ virtual ~StripeAccessor() = default;
+
+ virtual std::unique_ptr<StripeAccessGuard> rendezvous_and_hold_all() = 0;
+ // TODO also accessor for a single particular stripe?
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp
new file mode 100644
index 00000000000..cf1781e0ae8
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp
@@ -0,0 +1,1132 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "stripe_bucket_db_updater.h"
+#include "bucket_db_prune_elision.h"
+#include "bucket_space_distribution_context.h"
+#include "distributor.h"
+#include "distributor_bucket_space.h"
+#include "distributormetricsset.h"
+#include "pending_bucket_space_db_transition.h"
+#include "potential_data_loss_report.h"
+#include "simpleclusterinformation.h"
+#include "stripe_access_guard.h"
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/message/removelocation.h>
+#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vespalib/util/xmlstream.h>
+#include <thread>
+
+#include <vespa/log/bufferedlogger.h>
+LOG_SETUP(".distributor.stripe_bucket_db_updater");
+
+using storage::lib::Node;
+using storage::lib::NodeType;
+using document::BucketSpace;
+
+namespace storage::distributor {
+
+StripeBucketDBUpdater::StripeBucketDBUpdater(DistributorStripeInterface& owner,
+ DistributorBucketSpaceRepo& bucketSpaceRepo,
+ DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
+ DistributorMessageSender& sender,
+ DistributorComponentRegister& compReg)
+ : framework::StatusReporter("bucketdb", "Bucket DB Updater"),
+ _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"),
+ _node_ctx(_distributorComponent),
+ _op_ctx(_distributorComponent),
+ _distributor_interface(_distributorComponent.getDistributor()),
+ _delayedRequests(),
+ _sentMessages(),
+ _pendingClusterState(),
+ _history(),
+ _sender(sender),
+ _enqueuedRechecks(),
+ _outdatedNodesMap(),
+ _transitionTimer(_node_ctx.clock()),
+ _stale_reads_enabled(false),
+ _active_distribution_contexts(),
+ _explicit_transition_read_guard(),
+ _distribution_context_mutex()
+{
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
+ _active_distribution_contexts.emplace(
+ elem.first,
+ BucketSpaceDistributionContext::make_not_yet_initialized(_node_ctx.node_index()));
+ _explicit_transition_read_guard.emplace(elem.first, std::shared_ptr<BucketDatabase::ReadGuard>());
+ }
+}
+
+StripeBucketDBUpdater::~StripeBucketDBUpdater() = default;
+
+OperationRoutingSnapshot StripeBucketDBUpdater::read_snapshot_for_bucket(const document::Bucket& bucket) const {
+ const auto bucket_space = bucket.getBucketSpace();
+ std::lock_guard lock(_distribution_context_mutex);
+ auto active_state_iter = _active_distribution_contexts.find(bucket_space);
+ assert(active_state_iter != _active_distribution_contexts.cend());
+ auto& state = *active_state_iter->second;
+ if (!state.bucket_owned_in_active_state(bucket.getBucketId())) {
+ return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second);
+ }
+ const bool bucket_present_in_mutable_db = state.bucket_owned_in_pending_state(bucket.getBucketId());
+ if (!bucket_present_in_mutable_db && !stale_reads_enabled()) {
+ return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second);
+ }
+ const auto& space_repo = bucket_present_in_mutable_db
+ ? _op_ctx.bucket_space_repo()
+ : _op_ctx.read_only_bucket_space_repo();
+ auto existing_guard_iter = _explicit_transition_read_guard.find(bucket_space);
+ assert(existing_guard_iter != _explicit_transition_read_guard.cend());
+ auto db_guard = existing_guard_iter->second
+ ? existing_guard_iter-> second
+ : space_repo.get(bucket_space).getBucketDatabase().acquire_read_guard();
+ return OperationRoutingSnapshot::make_routable_with_guard(active_state_iter->second, std::move(db_guard), space_repo);
+}
+
+void
+StripeBucketDBUpdater::flush()
+{
+ for (auto & entry : _sentMessages) {
+ // Cannot sendDown MergeBucketReplies during flushing, since
+ // all lower links have been closed
+ if (entry.second._mergeReplyGuard) {
+ entry.second._mergeReplyGuard->resetReply();
+ }
+ }
+ _sentMessages.clear();
+}
+
+void
+StripeBucketDBUpdater::print(std::ostream& out, bool verbose, const std::string& indent) const
+{
+ (void) verbose; (void) indent;
+ out << "BucketDBUpdater";
+}
+
+bool
+StripeBucketDBUpdater::shouldDeferStateEnabling() const noexcept
+{
+ return stale_reads_enabled();
+}
+
+bool
+StripeBucketDBUpdater::hasPendingClusterState() const
+{
+ // Defer to the repo instead of checking our own internal pending cluster state,
+ // as we won't have one if the top level distributor handles this for all stripes.
+ // But if we're operating in "legacy" mode with this stripe bucket DB updater as
+ // the authoritative source, there should always be an internal pending cluster
+ // state if the repo is tagged as having one as well.
+ // Since we also set a pending cluster state bundle when triggered by a distribution
+ // config change, this check also covers that case.
+ return _op_ctx.bucket_space_repo().get(document::FixedBucketSpaces::default_space()).has_pending_cluster_state();
+}
+
+const lib::ClusterState*
+StripeBucketDBUpdater::pendingClusterStateOrNull(const document::BucketSpace& space) const {
+ auto& distr_space = _op_ctx.bucket_space_repo().get(space);
+ return (distr_space.has_pending_cluster_state()
+ ? &distr_space.get_pending_cluster_state()
+ : nullptr);
+}
+
+void
+StripeBucketDBUpdater::sendRequestBucketInfo(
+ uint16_t node,
+ const document::Bucket& bucket,
+ const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard)
+{
+ if (!_op_ctx.storage_node_is_up(bucket.getBucketSpace(), node)) {
+ return;
+ }
+
+ std::vector<document::BucketId> buckets;
+ buckets.push_back(bucket.getBucketId());
+
+ auto msg = std::make_shared<api::RequestBucketInfoCommand>(bucket.getBucketSpace(), buckets);
+
+ LOG(debug,
+ "Sending request bucket info command %" PRIu64 " for "
+ "bucket %s to node %u",
+ msg->getMsgId(),
+ bucket.toString().c_str(),
+ node);
+
+ msg->setPriority(50);
+ msg->setAddress(_node_ctx.node_address(node));
+
+ _sentMessages[msg->getMsgId()] =
+ BucketRequest(node, _op_ctx.generate_unique_timestamp(),
+ bucket, mergeReplyGuard);
+ _sender.sendCommand(msg);
+}
+
+void
+StripeBucketDBUpdater::recheckBucketInfo(uint32_t nodeIdx,
+ const document::Bucket& bucket)
+{
+ sendRequestBucketInfo(nodeIdx, bucket, std::shared_ptr<MergeReplyGuard>());
+}
+
+namespace {
+
+class ReadOnlyDbMergingInserter : public BucketDatabase::MergingProcessor {
+ using NewEntries = std::vector<BucketDatabase::Entry>;
+ NewEntries::const_iterator _current;
+ const NewEntries::const_iterator _last;
+public:
+ explicit ReadOnlyDbMergingInserter(const NewEntries& new_entries)
+ : _current(new_entries.cbegin()),
+ _last(new_entries.cend())
+ {}
+
+ Result merge(BucketDatabase::Merger& m) override {
+ const uint64_t key_to_insert = m.bucket_key();
+ uint64_t key_at_cursor = 0;
+ while (_current != _last) {
+ key_at_cursor = _current->getBucketId().toKey();
+ if (key_at_cursor >= key_to_insert) {
+ break;
+ }
+ m.insert_before_current(_current->getBucketId(), *_current);
+ ++_current;
+ }
+ if ((_current != _last) && (key_at_cursor == key_to_insert)) {
+ // If we encounter a bucket that already exists, replace value wholesale.
+ // Don't try to cleverly merge replicas, as the values we currently hold
+ // in the read-only DB may be stale.
+ // Note that this case shouldn't really happen, since we only add previously
+ // owned buckets to the read-only DB, and subsequent adds to a non-empty DB
+ // can only happen for state preemptions. Since ownership is not regained
+ // before a state is stable, a bucket is only added once. But we handle it
+ // anyway in case this changes at some point in the future.
+ m.current_entry() = *_current;
+ return Result::Update;
+ }
+ return Result::KeepUnchanged;
+ }
+
+ void insert_remaining_at_end(BucketDatabase::TrailingInserter& inserter) override {
+ for (; _current != _last; ++_current) {
+ inserter.insert_at_end(_current->getBucketId(), *_current);
+ }
+ }
+};
+
+}
+
+void
+StripeBucketDBUpdater::removeSuperfluousBuckets(
+ const lib::ClusterStateBundle& newState,
+ bool is_distribution_config_change)
+{
+ const bool move_to_read_only_db = shouldDeferStateEnabling();
+ const char* up_states = _op_ctx.storage_node_up_states();
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
+ const auto& newDistribution(elem.second->getDistribution());
+ const auto& oldClusterState(elem.second->getClusterState());
+ const auto& new_cluster_state = newState.getDerivedClusterState(elem.first);
+
+ // Running a full DB sweep is expensive, so if the cluster state transition does
+ // not actually indicate that buckets should possibly be removed, we elide it entirely.
+ if (!is_distribution_config_change
+ && db_pruning_may_be_elided(oldClusterState, *new_cluster_state, up_states))
+ {
+ LOG(debug, "[bucket space '%s']: eliding DB pruning for state transition '%s' -> '%s'",
+ document::FixedBucketSpaces::to_string(elem.first).data(),
+ oldClusterState.toString().c_str(), new_cluster_state->toString().c_str());
+ continue;
+ }
+
+ auto& bucketDb(elem.second->getBucketDatabase());
+ auto& readOnlyDb(_op_ctx.read_only_bucket_space_repo().get(elem.first).getBucketDatabase());
+
+ // Remove all buckets not belonging to this distributor, or
+ // being on storage nodes that are no longer up.
+ MergingNodeRemover proc(
+ oldClusterState,
+ *new_cluster_state,
+ _node_ctx.node_index(),
+ newDistribution,
+ up_states,
+ move_to_read_only_db);
+
+ bucketDb.merge(proc);
+ if (move_to_read_only_db) {
+ ReadOnlyDbMergingInserter read_only_merger(proc.getNonOwnedEntries());
+ readOnlyDb.merge(read_only_merger);
+ }
+ maybe_inject_simulated_db_pruning_delay();
+ }
+}
+
+PotentialDataLossReport
+StripeBucketDBUpdater::remove_superfluous_buckets(
+ document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change)
+{
+ (void)is_distribution_change; // TODO remove if not needed
+ const bool move_to_read_only_db = shouldDeferStateEnabling();
+ const char* up_states = _op_ctx.storage_node_up_states();
+
+ auto& s = _op_ctx.bucket_space_repo().get(bucket_space);
+ const auto& new_distribution = s.getDistribution();
+ const auto& old_cluster_state = s.getClusterState();
+ // Elision of DB sweep is done at a higher level, so we don't have to do that here.
+ auto& bucket_db = s.getBucketDatabase();
+ auto& read_only_db = _op_ctx.read_only_bucket_space_repo().get(bucket_space).getBucketDatabase();
+
+ // Remove all buckets not belonging to this distributor, or
+ // being on storage nodes that are no longer up.
+ MergingNodeRemover proc(
+ old_cluster_state,
+ new_state,
+ _node_ctx.node_index(),
+ new_distribution,
+ up_states,
+ move_to_read_only_db);
+
+ bucket_db.merge(proc);
+ if (move_to_read_only_db) {
+ ReadOnlyDbMergingInserter read_only_merger(proc.getNonOwnedEntries());
+ read_only_db.merge(read_only_merger);
+ }
+ PotentialDataLossReport report;
+ report.buckets = proc.removed_buckets();
+ report.documents = proc.removed_documents();
+ return report;
+}
+
+void
+StripeBucketDBUpdater::merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries)
+{
+ auto& s = _op_ctx.bucket_space_repo().get(bucket_space);
+ auto& bucket_db = s.getBucketDatabase();
+
+ PendingBucketSpaceDbTransition::DbMerger merger(gathered_at_timestamp, distribution, new_state,
+ storage_up_states, outdated_nodes, entries);
+ bucket_db.merge(merger);
+}
+
+namespace {
+
+void maybe_sleep_for(std::chrono::milliseconds ms) {
+ if (ms.count() > 0) {
+ std::this_thread::sleep_for(ms);
+ }
+}
+
+}
+
+void
+StripeBucketDBUpdater::maybe_inject_simulated_db_pruning_delay() {
+ maybe_sleep_for(_op_ctx.distributor_config().simulated_db_pruning_latency());
+}
+
+void
+StripeBucketDBUpdater::maybe_inject_simulated_db_merging_delay() {
+ maybe_sleep_for(_op_ctx.distributor_config().simulated_db_merging_latency());
+}
+
+void
+StripeBucketDBUpdater::ensureTransitionTimerStarted()
+{
+ // Don't overwrite start time if we're already processing a state, as
+ // that will make transition times appear artificially low.
+ if (!hasPendingClusterState()) {
+ _transitionTimer = framework::MilliSecTimer(
+ _node_ctx.clock());
+ }
+}
+
+void
+StripeBucketDBUpdater::completeTransitionTimer()
+{
+ _distributor_interface.getMetrics()
+ .stateTransitionTime.addValue(_transitionTimer.getElapsedTimeAsDouble());
+}
+
+void
+StripeBucketDBUpdater::clearReadOnlyBucketRepoDatabases()
+{
+ for (auto& space : _op_ctx.read_only_bucket_space_repo()) {
+ space.second->getBucketDatabase().clear();
+ }
+}
+
+void
+StripeBucketDBUpdater::storageDistributionChanged()
+{
+ ensureTransitionTimerStarted();
+
+ removeSuperfluousBuckets(_op_ctx.cluster_state_bundle(), true);
+
+ auto clusterInfo = std::make_shared<const SimpleClusterInformation>(
+ _node_ctx.node_index(),
+ _op_ctx.cluster_state_bundle(),
+ _op_ctx.storage_node_up_states());
+ _pendingClusterState = PendingClusterState::createForDistributionChange(
+ _node_ctx.clock(),
+ std::move(clusterInfo),
+ _sender,
+ _op_ctx.bucket_space_repo(),
+ _op_ctx.generate_unique_timestamp());
+ _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
+ _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
+}
+
+void
+StripeBucketDBUpdater::replyToPreviousPendingClusterStateIfAny()
+{
+ if (_pendingClusterState.get() && _pendingClusterState->hasCommand()) {
+ _distributor_interface.getMessageSender().sendUp(
+ std::make_shared<api::SetSystemStateReply>(*_pendingClusterState->getCommand()));
+ }
+}
+
+void
+StripeBucketDBUpdater::replyToActivationWithActualVersion(
+ const api::ActivateClusterStateVersionCommand& cmd,
+ uint32_t actualVersion)
+{
+ auto reply = std::make_shared<api::ActivateClusterStateVersionReply>(cmd);
+ reply->setActualVersion(actualVersion);
+ _distributor_interface.getMessageSender().sendUp(reply); // TODO let API accept rvalues
+}
+
+void StripeBucketDBUpdater::update_read_snapshot_before_db_pruning() {
+ std::lock_guard lock(_distribution_context_mutex);
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
+ // At this point, we're still operating with a distribution context _without_ a
+ // pending state, i.e. anyone using the context will expect to find buckets
+ // in the DB that correspond to how the database looked like prior to pruning
+ // buckets from the DB. To ensure this is not violated, take a snapshot of the
+ // _mutable_ DB and expose this. This snapshot only lives until we atomically
+ // flip to expose a distribution context that includes the new, pending state.
+ // At that point, the read-only DB is known to contain the buckets that have
+ // been pruned away, so we can release the mutable DB snapshot safely.
+ // TODO test for, and handle, state preemption case!
+ _explicit_transition_read_guard[elem.first] = elem.second->getBucketDatabase().acquire_read_guard();
+ }
+}
+
+
+void StripeBucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) {
+ std::lock_guard lock(_distribution_context_mutex);
+ const auto old_default_state = _op_ctx.bucket_space_repo().get(
+ document::FixedBucketSpaces::default_space()).cluster_state_sp();
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
+ auto new_distribution = elem.second->distribution_sp();
+ auto old_cluster_state = elem.second->cluster_state_sp();
+ auto new_cluster_state = new_state.getDerivedClusterState(elem.first);
+ _active_distribution_contexts.insert_or_assign(
+ elem.first,
+ BucketSpaceDistributionContext::make_state_transition(
+ std::move(old_cluster_state),
+ old_default_state,
+ std::move(new_cluster_state),
+ std::move(new_distribution),
+ _node_ctx.node_index()));
+ // We can now remove the explicit mutable DB snapshot, as the buckets that have been
+ // pruned away are visible in the read-only DB.
+ _explicit_transition_read_guard[elem.first] = std::shared_ptr<BucketDatabase::ReadGuard>();
+ }
+}
+
+void StripeBucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) {
+ std::lock_guard lock(_distribution_context_mutex);
+ const auto& default_cluster_state = activated_state.getDerivedClusterState(document::FixedBucketSpaces::default_space());
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
+ auto new_distribution = elem.second->distribution_sp();
+ auto new_cluster_state = activated_state.getDerivedClusterState(elem.first);
+ _active_distribution_contexts.insert_or_assign(
+ elem.first,
+ BucketSpaceDistributionContext::make_stable_state(
+ std::move(new_cluster_state),
+ default_cluster_state,
+ std::move(new_distribution),
+ _node_ctx.node_index()));
+ }
+}
+
+bool
+StripeBucketDBUpdater::onSetSystemState(
+ const std::shared_ptr<api::SetSystemStateCommand>& cmd)
+{
+ LOG(debug,
+ "Received new cluster state %s",
+ cmd->getSystemState().toString().c_str());
+
+ const lib::ClusterStateBundle oldState = _op_ctx.cluster_state_bundle();
+ const lib::ClusterStateBundle& state = cmd->getClusterStateBundle();
+
+ if (state == oldState) {
+ return false;
+ }
+ ensureTransitionTimerStarted();
+ // Separate timer since _transition_timer might span multiple pending states.
+ framework::MilliSecTimer process_timer(_node_ctx.clock());
+ update_read_snapshot_before_db_pruning();
+ const auto& bundle = cmd->getClusterStateBundle();
+ removeSuperfluousBuckets(bundle, false);
+ update_read_snapshot_after_db_pruning(bundle);
+ replyToPreviousPendingClusterStateIfAny();
+
+ auto clusterInfo = std::make_shared<const SimpleClusterInformation>(
+ _node_ctx.node_index(),
+ _op_ctx.cluster_state_bundle(),
+ _op_ctx.storage_node_up_states());
+ _pendingClusterState = PendingClusterState::createForClusterStateChange(
+ _node_ctx.clock(),
+ std::move(clusterInfo),
+ _sender,
+ _op_ctx.bucket_space_repo(),
+ cmd,
+ _outdatedNodesMap,
+ _op_ctx.generate_unique_timestamp());
+ _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
+
+ _distributor_interface.getMetrics().set_cluster_state_processing_time.addValue(
+ process_timer.getElapsedTimeAsDouble());
+
+ _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
+ if (isPendingClusterStateCompleted()) {
+ processCompletedPendingClusterState();
+ }
+ return true;
+}
+
+bool
+StripeBucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd)
+{
+ if (hasPendingClusterState() && _pendingClusterState->isVersionedTransition()) {
+ const auto pending_version = _pendingClusterState->clusterStateVersion();
+ if (pending_version == cmd->version()) {
+ if (isPendingClusterStateCompleted()) {
+ assert(_pendingClusterState->isDeferred());
+ activatePendingClusterState();
+ } else {
+ LOG(error, "Received cluster state activation for pending version %u "
+ "without pending state being complete yet. This is not expected, "
+ "as no activation should be sent before all distributors have "
+ "reported that state processing is complete.", pending_version);
+ replyToActivationWithActualVersion(*cmd, 0); // Invalid version, will cause re-send (hopefully when completed).
+ return true;
+ }
+ } else {
+ replyToActivationWithActualVersion(*cmd, pending_version);
+ return true;
+ }
+ } else if (shouldDeferStateEnabling()) {
+ // Likely just a resend, but log warn for now to get a feel of how common it is.
+ LOG(warning, "Received cluster state activation command for version %u, which "
+ "has no corresponding pending state. Likely resent operation.", cmd->version());
+ } else {
+ LOG(debug, "Received cluster state activation command for version %u, but distributor "
+ "config does not have deferred activation enabled. Treating as no-op.", cmd->version());
+ }
+ // Fall through to next link in call chain that cares about this message.
+ return false;
+}
+
+StripeBucketDBUpdater::MergeReplyGuard::~MergeReplyGuard()
+{
+ if (_reply) {
+ _distributor_interface.handleCompletedMerge(_reply);
+ }
+}
+
+bool
+StripeBucketDBUpdater::onMergeBucketReply(
+ const std::shared_ptr<api::MergeBucketReply>& reply)
+{
+ auto replyGuard = std::make_shared<MergeReplyGuard>(_distributor_interface, reply);
+
+ // In case the merge was unsuccessful somehow, or some nodes weren't
+ // actually merged (source-only nodes?) we request the bucket info of the
+ // bucket again to make sure it's ok.
+ for (uint32_t i = 0; i < reply->getNodes().size(); i++) {
+ sendRequestBucketInfo(reply->getNodes()[i].index,
+ reply->getBucket(),
+ replyGuard);
+ }
+
+ return true;
+}
+
+void
+StripeBucketDBUpdater::enqueueRecheckUntilPendingStateEnabled(
+ uint16_t node,
+ const document::Bucket& bucket)
+{
+ LOG(spam,
+ "DB updater has a pending cluster state, enqueuing recheck "
+ "of bucket %s on node %u until state is done processing",
+ bucket.toString().c_str(),
+ node);
+ _enqueuedRechecks.insert(EnqueuedBucketRecheck(node, bucket));
+}
+
+void
+StripeBucketDBUpdater::sendAllQueuedBucketRechecks()
+{
+ LOG(spam,
+ "Sending %zu queued bucket rechecks previously received "
+ "via NotifyBucketChange commands",
+ _enqueuedRechecks.size());
+
+ for (const auto & entry :_enqueuedRechecks) {
+ sendRequestBucketInfo(entry.node, entry.bucket, std::shared_ptr<MergeReplyGuard>());
+ }
+ _enqueuedRechecks.clear();
+}
+
+bool
+StripeBucketDBUpdater::onNotifyBucketChange(
+ const std::shared_ptr<api::NotifyBucketChangeCommand>& cmd)
+{
+ // Immediately schedule reply to ensure it is sent.
+ _sender.sendReply(std::make_shared<api::NotifyBucketChangeReply>(*cmd));
+
+ if (!cmd->getBucketInfo().valid()) {
+ LOG(error,
+ "Received invalid bucket info for bucket %s from notify bucket "
+ "change! Not updating bucket.",
+ cmd->getBucketId().toString().c_str());
+ return true;
+ }
+ LOG(debug,
+ "Received notify bucket change from node %u for bucket %s with %s.",
+ cmd->getSourceIndex(),
+ cmd->getBucketId().toString().c_str(),
+ cmd->getBucketInfo().toString().c_str());
+
+ if (hasPendingClusterState()) {
+ enqueueRecheckUntilPendingStateEnabled(cmd->getSourceIndex(),
+ cmd->getBucket());
+ } else {
+ sendRequestBucketInfo(cmd->getSourceIndex(),
+ cmd->getBucket(),
+ std::shared_ptr<MergeReplyGuard>());
+ }
+
+ return true;
+}
+
+namespace {
+
+bool sort_pred(const BucketListMerger::BucketEntry& left,
+ const BucketListMerger::BucketEntry& right) {
+ return left.first < right.first;
+}
+
+}
+
+bool
+StripeBucketDBUpdater::onRequestBucketInfoReply(
+ const std::shared_ptr<api::RequestBucketInfoReply> & repl)
+{
+ if (pendingClusterStateAccepted(repl)) {
+ return true;
+ }
+ return processSingleBucketInfoReply(repl);
+}
+
+bool
+StripeBucketDBUpdater::pendingClusterStateAccepted(
+ const std::shared_ptr<api::RequestBucketInfoReply> & repl)
+{
+ if (_pendingClusterState.get()
+ && _pendingClusterState->onRequestBucketInfoReply(repl))
+ {
+ if (isPendingClusterStateCompleted()) {
+ processCompletedPendingClusterState();
+ }
+ return true;
+ }
+ LOG(spam,
+ "Reply %s was not accepted by pending cluster state",
+ repl->toString().c_str());
+ return false;
+}
+
+void
+StripeBucketDBUpdater::handleSingleBucketInfoFailure(
+ const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ const BucketRequest& req)
+{
+ LOG(debug, "Request bucket info failed towards node %d: error was %s",
+ req.targetNode, repl->getResult().toString().c_str());
+
+ if (req.bucket.getBucketId() != document::BucketId(0)) {
+ framework::MilliSecTime sendTime(_node_ctx.clock());
+ sendTime += framework::MilliSecTime(100);
+ _delayedRequests.emplace_back(sendTime, req);
+ }
+}
+
+void
+StripeBucketDBUpdater::resendDelayedMessages()
+{
+ if (_pendingClusterState) {
+ _pendingClusterState->resendDelayedMessages();
+ }
+ if (_delayedRequests.empty()) {
+ return; // Don't fetch time if not needed
+ }
+ framework::MilliSecTime currentTime(_node_ctx.clock());
+ while (!_delayedRequests.empty()
+ && currentTime >= _delayedRequests.front().first)
+ {
+ BucketRequest& req(_delayedRequests.front().second);
+ sendRequestBucketInfo(req.targetNode, req.bucket, std::shared_ptr<MergeReplyGuard>());
+ _delayedRequests.pop_front();
+ }
+}
+
+void
+StripeBucketDBUpdater::convertBucketInfoToBucketList(
+ const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ uint16_t targetNode, BucketListMerger::BucketList& newList)
+{
+ for (const auto & entry : repl->getBucketInfo()) {
+ LOG(debug, "Received bucket information from node %u for bucket %s: %s", targetNode,
+ entry._bucketId.toString().c_str(), entry._info.toString().c_str());
+
+ newList.emplace_back(entry._bucketId, entry._info);
+ }
+}
+
+void
+StripeBucketDBUpdater::mergeBucketInfoWithDatabase(
+ const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ const BucketRequest& req)
+{
+ BucketListMerger::BucketList existing;
+ BucketListMerger::BucketList newList;
+
+ findRelatedBucketsInDatabase(req.targetNode, req.bucket, existing);
+ convertBucketInfoToBucketList(repl, req.targetNode, newList);
+
+ std::sort(existing.begin(), existing.end(), sort_pred);
+ std::sort(newList.begin(), newList.end(), sort_pred);
+
+ BucketListMerger merger(newList, existing, req.timestamp);
+ updateDatabase(req.bucket.getBucketSpace(), req.targetNode, merger);
+}
+
+bool
+StripeBucketDBUpdater::processSingleBucketInfoReply(
+ const std::shared_ptr<api::RequestBucketInfoReply> & repl)
+{
+ auto iter = _sentMessages.find(repl->getMsgId());
+
+ // Has probably been deleted for some reason earlier.
+ if (iter == _sentMessages.end()) {
+ return true;
+ }
+
+ BucketRequest req = iter->second;
+ _sentMessages.erase(iter);
+
+ if (!_op_ctx.storage_node_is_up(req.bucket.getBucketSpace(), req.targetNode)) {
+ // Ignore replies from nodes that are down.
+ return true;
+ }
+ if (repl->getResult().getResult() != api::ReturnCode::OK) {
+ handleSingleBucketInfoFailure(repl, req);
+ return true;
+ }
+ mergeBucketInfoWithDatabase(repl, req);
+ return true;
+}
+
+void
+StripeBucketDBUpdater::addBucketInfoForNode(
+ const BucketDatabase::Entry& e,
+ uint16_t node,
+ BucketListMerger::BucketList& existing) const
+{
+ const BucketCopy* copy(e->getNode(node));
+ if (copy) {
+ existing.emplace_back(e.getBucketId(), copy->getBucketInfo());
+ }
+}
+
+void
+StripeBucketDBUpdater::findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket,
+ BucketListMerger::BucketList& existing)
+{
+ auto &distributorBucketSpace(_op_ctx.bucket_space_repo().get(bucket.getBucketSpace()));
+ std::vector<BucketDatabase::Entry> entries;
+ distributorBucketSpace.getBucketDatabase().getAll(bucket.getBucketId(), entries);
+
+ for (const BucketDatabase::Entry & entry : entries) {
+ addBucketInfoForNode(entry, node, existing);
+ }
+}
+
+void
+StripeBucketDBUpdater::updateDatabase(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger)
+{
+ for (const document::BucketId & bucketId : merger.getRemovedEntries()) {
+ document::Bucket bucket(bucketSpace, bucketId);
+ _op_ctx.remove_node_from_bucket_database(bucket, node);
+ }
+
+ for (const BucketListMerger::BucketEntry& entry : merger.getAddedEntries()) {
+ document::Bucket bucket(bucketSpace, entry.first);
+ _op_ctx.update_bucket_database(
+ bucket,
+ BucketCopy(merger.getTimestamp(), node, entry.second),
+ DatabaseUpdate::CREATE_IF_NONEXISTING);
+ }
+}
+
+bool
+StripeBucketDBUpdater::isPendingClusterStateCompleted() const
+{
+ return _pendingClusterState.get() && _pendingClusterState->done();
+}
+
+void
+StripeBucketDBUpdater::processCompletedPendingClusterState()
+{
+ if (_pendingClusterState->isDeferred()) {
+ LOG(debug, "Deferring completion of pending cluster state version %u until explicitly activated",
+ _pendingClusterState->clusterStateVersion());
+ assert(_pendingClusterState->hasCommand()); // Deferred transitions should only ever be created by state commands.
+ // Sending down SetSystemState command will reach the state manager and a reply
+ // will be auto-sent back to the cluster controller in charge. Once this happens,
+ // it will send an explicit activation command once all distributors have reported
+ // that their pending cluster states have completed.
+ // A booting distributor will treat itself as "system Up" before the state has actually
+ // taken effect via activation. External operation handler will keep operations from
+ // actually being scheduled until state has been activated. The external operation handler
+ // needs to be explicitly aware of the case where no state has yet to be activated.
+ _distributor_interface.getMessageSender().sendDown(
+ _pendingClusterState->getCommand());
+ _pendingClusterState->clearCommand();
+ return;
+ }
+ // Distribution config change or non-deferred cluster state. Immediately activate
+ // the pending state without being told to do so explicitly.
+ activatePendingClusterState();
+}
+
+void
+StripeBucketDBUpdater::activatePendingClusterState()
+{
+ framework::MilliSecTimer process_timer(_node_ctx.clock());
+
+ _pendingClusterState->mergeIntoBucketDatabases();
+ maybe_inject_simulated_db_merging_delay();
+
+ if (_pendingClusterState->isVersionedTransition()) {
+ LOG(debug, "Activating pending cluster state version %u", _pendingClusterState->clusterStateVersion());
+ enableCurrentClusterStateBundleInDistributor();
+ if (_pendingClusterState->hasCommand()) {
+ _distributor_interface.getMessageSender().sendDown(
+ _pendingClusterState->getCommand());
+ }
+ addCurrentStateToClusterStateHistory();
+ } else {
+ LOG(debug, "Activating pending distribution config");
+ // TODO distribution changes cannot currently be deferred as they are not
+ // initiated by the cluster controller!
+ _distributor_interface.notifyDistributionChangeEnabled();
+ }
+
+ update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle());
+ _pendingClusterState.reset();
+ _outdatedNodesMap.clear();
+ _op_ctx.bucket_space_repo().clear_pending_cluster_state_bundle(); // TODO also read only bucket space..?
+ sendAllQueuedBucketRechecks();
+ completeTransitionTimer();
+ clearReadOnlyBucketRepoDatabases();
+
+ _distributor_interface.getMetrics().activate_cluster_state_processing_time.addValue(
+ process_timer.getElapsedTimeAsDouble());
+}
+
+void
+StripeBucketDBUpdater::enableCurrentClusterStateBundleInDistributor()
+{
+ const lib::ClusterStateBundle& state(
+ _pendingClusterState->getNewClusterStateBundle());
+
+ LOG(debug,
+ "BucketDBUpdater finished processing state %s",
+ state.getBaselineClusterState()->toString().c_str());
+
+ _distributor_interface.enableClusterStateBundle(state);
+}
+
+void StripeBucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state) {
+ update_read_snapshot_after_activation(activated_state);
+ _distributor_interface.enableClusterStateBundle(activated_state);
+}
+
+void
+StripeBucketDBUpdater::addCurrentStateToClusterStateHistory()
+{
+ _history.push_back(_pendingClusterState->getSummary());
+
+ if (_history.size() > 50) {
+ _history.pop_front();
+ }
+}
+
+vespalib::string
+StripeBucketDBUpdater::getReportContentType(const framework::HttpUrlPath&) const
+{
+ return "text/xml";
+}
+
+namespace {
+
+const vespalib::string ALL = "all";
+const vespalib::string BUCKETDB = "bucketdb";
+const vespalib::string BUCKETDB_UPDATER = "Bucket Database Updater";
+
+}
+
+void
+StripeBucketDBUpdater::BucketRequest::print_xml_tag(vespalib::xml::XmlOutputStream &xos, const vespalib::xml::XmlAttribute &timestampAttribute) const
+{
+ using namespace vespalib::xml;
+ xos << XmlTag("storagenode")
+ << XmlAttribute("index", targetNode);
+ xos << XmlAttribute("bucketspace", bucket.getBucketSpace().getId(), XmlAttribute::HEX);
+ if (bucket.getBucketId().getRawId() == 0) {
+ xos << XmlAttribute("bucket", ALL);
+ } else {
+ xos << XmlAttribute("bucket", bucket.getBucketId().getId(), XmlAttribute::HEX);
+ }
+ xos << timestampAttribute << XmlEndTag();
+}
+
+bool
+StripeBucketDBUpdater::reportStatus(std::ostream& out,
+ const framework::HttpUrlPath& path) const
+{
+ using namespace vespalib::xml;
+ XmlOutputStream xos(out);
+ // FIXME(vekterli): have to do this manually since we cannot inherit
+ // directly from XmlStatusReporter due to data races when BucketDBUpdater
+ // gets status requests directly.
+ xos << XmlTag("status")
+ << XmlAttribute("id", BUCKETDB)
+ << XmlAttribute("name", BUCKETDB_UPDATER);
+ reportXmlStatus(xos, path);
+ xos << XmlEndTag();
+ return true;
+}
+
+vespalib::string
+StripeBucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos,
+ const framework::HttpUrlPath&) const
+{
+ using namespace vespalib::xml;
+ xos << XmlTag("bucketdb")
+ << XmlTag("systemstate_active")
+ << XmlContent(_op_ctx.cluster_state_bundle().getBaselineClusterState()->toString())
+ << XmlEndTag();
+ if (_pendingClusterState) {
+ xos << *_pendingClusterState;
+ }
+ xos << XmlTag("systemstate_history");
+ for (auto i(_history.rbegin()), e(_history.rend()); i != e; ++i) {
+ xos << XmlTag("change")
+ << XmlAttribute("from", i->_prevClusterState)
+ << XmlAttribute("to", i->_newClusterState)
+ << XmlAttribute("processingtime", i->_processingTime)
+ << XmlEndTag();
+ }
+ xos << XmlEndTag()
+ << XmlTag("single_bucket_requests");
+ for (const auto & entry : _sentMessages)
+ {
+ entry.second.print_xml_tag(xos, XmlAttribute("sendtimestamp", entry.second.timestamp));
+ }
+ xos << XmlEndTag()
+ << XmlTag("delayed_single_bucket_requests");
+ for (const auto & entry : _delayedRequests)
+ {
+ entry.second.print_xml_tag(xos, XmlAttribute("resendtimestamp", entry.first.getTime()));
+ }
+ xos << XmlEndTag() << XmlEndTag();
+ return "";
+}
+
+StripeBucketDBUpdater::MergingNodeRemover::MergingNodeRemover(
+ const lib::ClusterState& oldState,
+ const lib::ClusterState& s,
+ uint16_t localIndex,
+ const lib::Distribution& distribution,
+ const char* upStates,
+ bool track_non_owned_entries)
+ : _oldState(oldState),
+ _state(s),
+ _available_nodes(),
+ _nonOwnedBuckets(),
+ _removed_buckets(0),
+ _removed_documents(0),
+ _localIndex(localIndex),
+ _distribution(distribution),
+ _upStates(upStates),
+ _track_non_owned_entries(track_non_owned_entries),
+ _cachedDecisionSuperbucket(UINT64_MAX),
+ _cachedOwned(false)
+{
+ // TODO intersection of cluster state and distribution config
+ const uint16_t storage_count = s.getNodeCount(lib::NodeType::STORAGE);
+ _available_nodes.resize(storage_count);
+ for (uint16_t i = 0; i < storage_count; ++i) {
+ if (s.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState().oneOf(_upStates)) {
+ _available_nodes[i] = true;
+ }
+ }
+}
+
+void
+StripeBucketDBUpdater::MergingNodeRemover::logRemove(const document::BucketId& bucketId, const char* msg) const
+{
+ LOG(spam, "Removing bucket %s: %s", bucketId.toString().c_str(), msg);
+}
+
+namespace {
+
+uint64_t superbucket_from_id(const document::BucketId& id, uint16_t distribution_bits) noexcept {
+ // The n LSBs of the bucket ID contain the superbucket number. Mask off the rest.
+ return id.getRawId() & ~(UINT64_MAX << distribution_bits);
+}
+
+}
+
+bool
+StripeBucketDBUpdater::MergingNodeRemover::distributorOwnsBucket(
+ const document::BucketId& bucketId) const
+{
+ // TODO "no distributors available" case is the same for _all_ buckets; cache once in constructor.
+ // TODO "too few bits used" case can be cheaply checked without needing exception
+ try {
+ const auto bits = _state.getDistributionBitCount();
+ const auto this_superbucket = superbucket_from_id(bucketId, bits);
+ if (_cachedDecisionSuperbucket == this_superbucket) {
+ if (!_cachedOwned) {
+ logRemove(bucketId, "bucket now owned by another distributor (cached)");
+ }
+ return _cachedOwned;
+ }
+
+ uint16_t distributor = _distribution.getIdealDistributorNode(_state, bucketId, "uim");
+ _cachedDecisionSuperbucket = this_superbucket;
+ _cachedOwned = (distributor == _localIndex);
+ if (!_cachedOwned) {
+ logRemove(bucketId, "bucket now owned by another distributor");
+ return false;
+ }
+ return true;
+ } catch (lib::TooFewBucketBitsInUseException& exc) {
+ logRemove(bucketId, "using too few distribution bits now");
+ } catch (lib::NoDistributorsAvailableException& exc) {
+ logRemove(bucketId, "no distributors are available");
+ }
+ return false;
+}
+
+void
+StripeBucketDBUpdater::MergingNodeRemover::setCopiesInEntry(
+ BucketDatabase::Entry& e,
+ const std::vector<BucketCopy>& copies) const
+{
+ e->clear();
+
+ std::vector<uint16_t> order =
+ _distribution.getIdealStorageNodes(_state, e.getBucketId(), _upStates);
+
+ e->addNodes(copies, order);
+
+ LOG(spam, "Changed %s", e->toString().c_str());
+}
+
+bool
+StripeBucketDBUpdater::MergingNodeRemover::has_unavailable_nodes(const storage::BucketDatabase::Entry& e) const
+{
+ const uint16_t n_nodes = e->getNodeCount();
+ for (uint16_t i = 0; i < n_nodes; i++) {
+ const uint16_t node_idx = e->getNodeRef(i).getNode();
+ if (!storage_node_is_available(node_idx)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+BucketDatabase::MergingProcessor::Result
+StripeBucketDBUpdater::MergingNodeRemover::merge(storage::BucketDatabase::Merger& merger)
+{
+ document::BucketId bucketId(merger.bucket_id());
+ LOG(spam, "Check for remove: bucket %s", bucketId.toString().c_str());
+ if (!distributorOwnsBucket(bucketId)) {
+ // TODO remove in favor of DB snapshotting
+ if (_track_non_owned_entries) {
+ _nonOwnedBuckets.emplace_back(merger.current_entry());
+ }
+ return Result::Skip;
+ }
+ auto& e = merger.current_entry();
+
+ if (e->getNodeCount() == 0) { // TODO when should this edge ever trigger?
+ return Result::Skip;
+ }
+
+ if (!has_unavailable_nodes(e)) {
+ return Result::KeepUnchanged;
+ }
+
+ std::vector<BucketCopy> remainingCopies;
+ for (uint16_t i = 0; i < e->getNodeCount(); i++) {
+ const uint16_t node_idx = e->getNodeRef(i).getNode();
+ if (storage_node_is_available(node_idx)) {
+ remainingCopies.push_back(e->getNodeRef(i));
+ }
+ }
+
+ if (remainingCopies.empty()) {
+ ++_removed_buckets;
+ _removed_documents += e->getHighestDocumentCount();
+ return Result::Skip;
+ } else {
+ setCopiesInEntry(e, remainingCopies);
+ return Result::Update;
+ }
+}
+
+bool
+StripeBucketDBUpdater::MergingNodeRemover::storage_node_is_available(uint16_t index) const noexcept
+{
+ return ((index < _available_nodes.size()) && _available_nodes[index]);
+}
+
+StripeBucketDBUpdater::MergingNodeRemover::~MergingNodeRemover()
+{
+ if (_removed_buckets != 0) {
+ LOGBM(info, "After cluster state change %s, %zu buckets no longer "
+ "have available replicas. %zu documents in these buckets will "
+ "be unavailable until nodes come back up",
+ _oldState.getTextualDifference(_state).c_str(),
+ _removed_buckets, _removed_documents);
+ }
+}
+
+} // distributor
diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
new file mode 100644
index 00000000000..e34d28a69bc
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
@@ -0,0 +1,284 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "bucketlistmerger.h"
+#include "distributor_stripe_component.h"
+#include "distributormessagesender.h"
+#include "messageguard.h"
+#include "operation_routing_snapshot.h"
+#include "outdated_nodes_map.h"
+#include "pendingclusterstate.h"
+#include "potential_data_loss_report.h"
+#include <vespa/document/bucket/bucket.h>
+#include <vespa/storage/common/storagelink.h>
+#include <vespa/storageapi/message/bucket.h>
+#include <vespa/storageapi/messageapi/messagehandler.h>
+#include <vespa/storageframework/generic/clock/timer.h>
+#include <vespa/storageframework/generic/status/statusreporter.h>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <atomic>
+#include <list>
+#include <mutex>
+
+namespace vespalib::xml {
+class XmlOutputStream;
+class XmlAttribute;
+}
+
+namespace storage::distributor {
+
+class DistributorStripeInterface;
+class BucketSpaceDistributionContext;
+
+class StripeBucketDBUpdater final
+ : public framework::StatusReporter,
+ public api::MessageHandler
+{
+public:
+ using OutdatedNodesMap = dbtransition::OutdatedNodesMap;
+ StripeBucketDBUpdater(DistributorStripeInterface& owner,
+ DistributorBucketSpaceRepo& bucketSpaceRepo,
+ DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
+ DistributorMessageSender& sender,
+ DistributorComponentRegister& compReg);
+ ~StripeBucketDBUpdater() override;
+
+ void flush();
+ const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const;
+ void recheckBucketInfo(uint32_t nodeIdx, const document::Bucket& bucket);
+
+ bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override;
+ bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) override;
+ bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply> & repl) override;
+ bool onMergeBucketReply(const std::shared_ptr<api::MergeBucketReply>& reply) override;
+ bool onNotifyBucketChange(const std::shared_ptr<api::NotifyBucketChangeCommand>&) override;
+ void resendDelayedMessages();
+ void storageDistributionChanged();
+
+ vespalib::string reportXmlStatus(vespalib::xml::XmlOutputStream&, const framework::HttpUrlPath&) const;
+ vespalib::string getReportContentType(const framework::HttpUrlPath&) const override;
+ bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override;
+ void print(std::ostream& out, bool verbose, const std::string& indent) const;
+ const DistributorNodeContext& node_context() const { return _node_ctx; }
+ DistributorOperationContext& operation_context() { return _op_ctx; }
+
+ /**
+ * Returns whether the current PendingClusterState indicates that there has
+ * been a transfer of bucket ownership amongst the distributors in the
+ * cluster. This method only makes sense to call when _pending_cluster_state
+ * is active, such as from within a enableClusterState() call.
+ */
+ bool bucketOwnershipHasChanged() const {
+ return ((_pendingClusterState.get() != nullptr)
+ && _pendingClusterState->hasBucketOwnershipTransfer());
+ }
+ void set_stale_reads_enabled(bool enabled) noexcept {
+ _stale_reads_enabled.store(enabled, std::memory_order_relaxed);
+ }
+ bool stale_reads_enabled() const noexcept {
+ return _stale_reads_enabled.load(std::memory_order_relaxed);
+ }
+
+ OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const;
+private:
+ class MergeReplyGuard {
+ public:
+ MergeReplyGuard(DistributorStripeInterface& distributor_interface, const std::shared_ptr<api::MergeBucketReply>& reply) noexcept
+ : _distributor_interface(distributor_interface), _reply(reply) {}
+
+ ~MergeReplyGuard();
+
+ // Used when we're flushing and simply want to drop the reply rather
+ // than send it down
+ void resetReply() { _reply.reset(); }
+ private:
+ DistributorStripeInterface& _distributor_interface;
+ std::shared_ptr<api::MergeBucketReply> _reply;
+ };
+
+ struct BucketRequest {
+ BucketRequest()
+ : targetNode(0), bucket(), timestamp(0) {};
+
+ BucketRequest(uint16_t t, uint64_t currentTime, const document::Bucket& b,
+ const std::shared_ptr<MergeReplyGuard>& guard)
+ : targetNode(t),
+ bucket(b),
+ timestamp(currentTime),
+ _mergeReplyGuard(guard) {};
+
+ void print_xml_tag(vespalib::xml::XmlOutputStream &xos, const vespalib::xml::XmlAttribute &timestampAttribute) const;
+ uint16_t targetNode;
+ document::Bucket bucket;
+ uint64_t timestamp;
+
+ std::shared_ptr<MergeReplyGuard> _mergeReplyGuard;
+ };
+
+ struct EnqueuedBucketRecheck {
+ uint16_t node;
+ document::Bucket bucket;
+
+ EnqueuedBucketRecheck() : node(0), bucket() {}
+
+ EnqueuedBucketRecheck(uint16_t _node, const document::Bucket& _bucket)
+ : node(_node),
+ bucket(_bucket)
+ {}
+
+ bool operator<(const EnqueuedBucketRecheck& o) const {
+ if (node != o.node) {
+ return node < o.node;
+ }
+ return bucket < o.bucket;
+ }
+ bool operator==(const EnqueuedBucketRecheck& o) const {
+ return node == o.node && bucket == o.bucket;
+ }
+ };
+
+ friend class DistributorTestUtil;
+ // TODO refactor and rewire to avoid needing this direct meddling
+ friend class LegacySingleStripeAccessGuard;
+ // Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor
+ // components agree on the currently active cluster state bundle.
+ // Transitively invokes Distributor::enableClusterStateBundle
+ void simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state);
+
+ bool shouldDeferStateEnabling() const noexcept;
+ bool hasPendingClusterState() const;
+ bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
+ bool processSingleBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
+ void handleSingleBucketInfoFailure(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ const BucketRequest& req);
+ bool isPendingClusterStateCompleted() const;
+ void processCompletedPendingClusterState();
+ void activatePendingClusterState();
+ void mergeBucketInfoWithDatabase(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ const BucketRequest& req);
+ void convertBucketInfoToBucketList(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ uint16_t targetNode, BucketListMerger::BucketList& newList);
+ void sendRequestBucketInfo(uint16_t node, const document::Bucket& bucket,
+ const std::shared_ptr<MergeReplyGuard>& mergeReply);
+ void addBucketInfoForNode(const BucketDatabase::Entry& e, uint16_t node,
+ BucketListMerger::BucketList& existing) const;
+ void ensureTransitionTimerStarted();
+ void completeTransitionTimer();
+ void clearReadOnlyBucketRepoDatabases();
+ /**
+ * Adds all buckets contained in the bucket database
+ * that are either contained
+ * in bucketId, or that bucketId is contained in, that have copies
+ * on the given node.
+ */
+ void findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket,
+ BucketListMerger::BucketList& existing);
+
+ /**
+ Updates the bucket database from the information generated by the given
+ bucket list merger.
+ */
+ void updateDatabase(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger);
+
+ void updateState(const lib::ClusterState& oldState, const lib::ClusterState& newState);
+
+ void update_read_snapshot_before_db_pruning();
+ void removeSuperfluousBuckets(const lib::ClusterStateBundle& newState,
+ bool is_distribution_config_change);
+ void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state);
+ void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state);
+
+ // TODO STRIPE only called when stripe guard is held
+ PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change);
+ void merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries);
+
+ void replyToPreviousPendingClusterStateIfAny();
+ void replyToActivationWithActualVersion(
+ const api::ActivateClusterStateVersionCommand& cmd,
+ uint32_t actualVersion);
+
+ void enableCurrentClusterStateBundleInDistributor();
+ void addCurrentStateToClusterStateHistory();
+ void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::Bucket&);
+ void sendAllQueuedBucketRechecks();
+
+ void maybe_inject_simulated_db_pruning_delay();
+ void maybe_inject_simulated_db_merging_delay();
+
+ /**
+ Removes all copies of buckets that are on nodes that are down.
+ */
+ class MergingNodeRemover : public BucketDatabase::MergingProcessor {
+ public:
+ MergingNodeRemover(const lib::ClusterState& oldState,
+ const lib::ClusterState& s,
+ uint16_t localIndex,
+ const lib::Distribution& distribution,
+ const char* upStates,
+ bool track_non_owned_entries);
+ ~MergingNodeRemover() override;
+
+ Result merge(BucketDatabase::Merger&) override;
+ void logRemove(const document::BucketId& bucketId, const char* msg) const;
+ bool distributorOwnsBucket(const document::BucketId&) const;
+
+ const std::vector<BucketDatabase::Entry>& getNonOwnedEntries() const noexcept {
+ return _nonOwnedBuckets;
+ }
+ size_t removed_buckets() const noexcept { return _removed_buckets; }
+ size_t removed_documents() const noexcept { return _removed_documents; }
+ private:
+ void setCopiesInEntry(BucketDatabase::Entry& e, const std::vector<BucketCopy>& copies) const;
+
+ bool has_unavailable_nodes(const BucketDatabase::Entry&) const;
+ bool storage_node_is_available(uint16_t index) const noexcept;
+
+ const lib::ClusterState _oldState;
+ const lib::ClusterState _state;
+ std::vector<bool> _available_nodes;
+ std::vector<BucketDatabase::Entry> _nonOwnedBuckets;
+ size_t _removed_buckets;
+ size_t _removed_documents;
+
+ uint16_t _localIndex;
+ const lib::Distribution& _distribution;
+ const char* _upStates;
+ bool _track_non_owned_entries;
+
+ mutable uint64_t _cachedDecisionSuperbucket;
+ mutable bool _cachedOwned;
+ };
+
+ DistributorStripeComponent _distributorComponent;
+ const DistributorNodeContext& _node_ctx;
+ DistributorOperationContext& _op_ctx;
+ DistributorStripeInterface& _distributor_interface;
+ std::deque<std::pair<framework::MilliSecTime, BucketRequest> > _delayedRequests;
+ std::map<uint64_t, BucketRequest> _sentMessages;
+ std::unique_ptr<PendingClusterState> _pendingClusterState;
+ std::list<PendingClusterState::Summary> _history;
+ DistributorMessageSender& _sender;
+ std::set<EnqueuedBucketRecheck> _enqueuedRechecks;
+ OutdatedNodesMap _outdatedNodesMap;
+ framework::MilliSecTimer _transitionTimer;
+ std::atomic<bool> _stale_reads_enabled;
+ using DistributionContexts = std::unordered_map<document::BucketSpace,
+ std::shared_ptr<BucketSpaceDistributionContext>,
+ document::BucketSpace::hash>;
+ DistributionContexts _active_distribution_contexts;
+ using DbGuards = std::unordered_map<document::BucketSpace,
+ std::shared_ptr<BucketDatabase::ReadGuard>,
+ document::BucketSpace::hash>;
+ DbGuards _explicit_transition_read_guard;
+ mutable std::mutex _distribution_context_mutex;
+};
+
+}
diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp
index fbbc7c366fd..367a325939e 100644
--- a/storage/src/vespa/storage/storageserver/distributornode.cpp
+++ b/storage/src/vespa/storage/storageserver/distributornode.cpp
@@ -107,6 +107,7 @@ DistributorNode::createChain(IStorageChainBuilder &builder)
builder.add(std::move(stateManager));
}
+// FIXME STRIPE not thread safe!!
api::Timestamp
DistributorNode::getUniqueTimestamp()
{