summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-10-01 11:09:36 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-10-01 12:20:51 +0000
commite368bcb60712305139377a97196b68f664aa2164 (patch)
treef9ca8888a4cb84f5aa105dd1e012ef8ce119f50d /storage
parent7295ea63246d69dc0b4c434c6f1773b8b5f8b5c4 (diff)
Add support for snapshotting all state required for routing a bucket operation
Let BucketDBUpdater expose a snapshotting function which will handle database routing based on the requested bucket and any pending cluster state transition.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp155
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt2
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp99
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h24
-rw-r--r--storage/src/vespa/storage/distributor/cluster_distribution_context.cpp79
-rw-r--r--storage/src/vespa/storage/distributor/cluster_distribution_context.h67
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/distributor_bucket_space.h6
-rw-r--r--storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp27
-rw-r--r--storage/src/vespa/storage/distributor/operation_routing_snapshot.h41
10 files changed, 490 insertions, 11 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 321b0cc3bba..30b5256cf13 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -2,11 +2,10 @@
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storage/distributor/bucketdbupdater.h>
+#include <vespa/storage/distributor/cluster_distribution_context.h>
#include <vespa/storage/distributor/distributormetricsset.h>
#include <vespa/storage/distributor/pending_bucket_space_db_transition.h>
#include <vespa/storage/distributor/outdated_nodes_map.h>
-#include <vespa/vespalib/io/fileutil.h>
-#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
#include <vespa/storage/storageutil/distributorstatecache.h>
#include <tests/distributor/distributortestutil.h>
#include <vespa/document/test/make_document_bucket.h>
@@ -124,7 +123,7 @@ public:
createLinks();
_bucketSpaces = getBucketSpaces();
// Disable deferred activation by default (at least for now) to avoid breaking the entire world.
- getConfig().setAllowStaleReadsDuringClusterStateTransitions(false);
+ getBucketDBUpdater().set_stale_reads_enabled(false);
};
void TearDown() override {
@@ -2415,7 +2414,7 @@ void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) {
}
TEST_F(BucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_ownership_change) {
- getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
+ getBucketDBUpdater().set_stale_reads_enabled(true);
lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
@@ -2468,7 +2467,7 @@ TEST_F(BucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_read_on
}
TEST_F(BucketDBUpdaterTest, non_owned_buckets_purged_when_read_only_support_is_config_disabled) {
- getConfig().setAllowStaleReadsDuringClusterStateTransitions(false);
+ getBucketDBUpdater().set_stale_reads_enabled(false);
lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
@@ -2497,7 +2496,6 @@ void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition(
uint32_t pending_buckets,
uint32_t pending_expected_msgs)
{
- getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
lib::ClusterState initial_state(initial_state_str);
setSystemState(initial_state);
ASSERT_EQ(messageCount(initial_expected_msgs), _sender.commands().size());
@@ -2514,6 +2512,7 @@ void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition(
}
TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until_activation_received) {
+ getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
@@ -2533,6 +2532,7 @@ TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until
}
TEST_F(BucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated) {
+ getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
@@ -2544,6 +2544,7 @@ TEST_F(BucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated
}
TEST_F(BucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_down) {
+ getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
@@ -2557,6 +2558,7 @@ TEST_F(BucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_d
}
TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_version_returns_actual_version) {
+ getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4,
@@ -2570,6 +2572,7 @@ TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_vers
}
TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_without_pending_transition_passes_message_through) {
+ getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
@@ -2727,4 +2730,144 @@ TEST_F(BucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_s
EXPECT_TRUE(state == nullptr);
}
+struct BucketDBUpdaterSnapshotTest : BucketDBUpdaterTest {
+ lib::ClusterState empty_state;
+ std::shared_ptr<lib::ClusterState> initial_baseline;
+ std::shared_ptr<lib::ClusterState> initial_default;
+ lib::ClusterStateBundle initial_bundle;
+ Bucket default_bucket;
+ Bucket global_bucket;
+
+ BucketDBUpdaterSnapshotTest()
+ : BucketDBUpdaterTest(),
+ empty_state(),
+ initial_baseline(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d")),
+ initial_default(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m")),
+ initial_bundle(*initial_baseline, {{FixedBucketSpaces::default_space(), initial_default},
+ {FixedBucketSpaces::global_space(), initial_baseline}}),
+ default_bucket(FixedBucketSpaces::default_space(), BucketId(16, 1234)),
+ global_bucket(FixedBucketSpaces::global_space(), BucketId(16, 1234))
+ {
+ }
+ ~BucketDBUpdaterSnapshotTest() override;
+
+ void SetUp() override {
+ BucketDBUpdaterTest::SetUp();
+ getBucketDBUpdater().set_stale_reads_enabled(true);
+ };
+
+ // Assumes that the distributor owns all buckets, so it may choose any arbitrary bucket in the bucket space
+ uint32_t buckets_in_snapshot_matching_current_db(DistributorBucketSpaceRepo& repo, BucketSpace bucket_space) {
+ auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(Bucket(bucket_space, BucketId(16, 1234)));
+ if (!def_rs.is_routable()) {
+ return 0;
+ }
+ auto guard = def_rs.steal_read_guard();
+ uint32_t found_buckets = 0;
+ for_each_bucket(repo, [&](const auto& space, const auto& entry) {
+ if (space == bucket_space) {
+ std::vector<BucketDatabase::Entry> entries;
+ guard->find_parents_and_self(entry.getBucketId(), entries);
+ if (entries.size() == 1) {
+ ++found_buckets;
+ }
+ }
+ });
+ return found_buckets;
+ }
+};
+
+BucketDBUpdaterSnapshotTest::~BucketDBUpdaterSnapshotTest() = default;
+
+TEST_F(BucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) {
+ auto rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_FALSE(rs.is_routable());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) {
+ auto rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket);
+ EXPECT_FALSE(rs.is_routable());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) {
+ set_cluster_state_bundle(initial_bundle);
+ // State currently pending, empty initial state is active
+
+ auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), empty_state.toString());
+ EXPECT_EQ(def_rs.context().baseline_active_cluster_state()->toString(), empty_state.toString());
+ ASSERT_TRUE(def_rs.context().has_pending_state_transition());
+ EXPECT_EQ(def_rs.context().pending_cluster_state()->toString(), initial_default->toString());
+
+ auto global_rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket);
+ EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), empty_state.toString());
+ EXPECT_EQ(global_rs.context().baseline_active_cluster_state()->toString(), empty_state.toString());
+ ASSERT_TRUE(global_rs.context().has_pending_state_transition());
+ EXPECT_EQ(global_rs.context().pending_cluster_state()->toString(), initial_baseline->toString());
+
+ ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(*initial_baseline, messageCount(1), 0));
+ // State now activated, no pending
+
+ def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), initial_default->toString());
+ EXPECT_EQ(def_rs.context().baseline_active_cluster_state()->toString(), initial_baseline->toString());
+ EXPECT_FALSE(def_rs.context().has_pending_state_transition());
+
+ global_rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket);
+ EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), initial_baseline->toString());
+ EXPECT_EQ(global_rs.context().baseline_active_cluster_state()->toString(), initial_baseline->toString());
+ EXPECT_FALSE(global_rs.context().has_pending_state_transition());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) {
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:1 storage:4", n_buckets, 4));
+ EXPECT_FALSE(activate_cluster_state_version(2));
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(mutable_repo(), FixedBucketSpaces::default_space()),
+ n_buckets);
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(mutable_repo(), FixedBucketSpaces::global_space()),
+ n_buckets);
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) {
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:2 .0.s:d storage:4", 0, 0));
+ EXPECT_FALSE(activate_cluster_state_version(2));
+ // We're down in state 2 and therefore do not own any buckets
+ auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_FALSE(def_rs.is_routable());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) {
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:2 .0.s:d storage:4", 0, 0));
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(read_only_repo(), FixedBucketSpaces::default_space()),
+ n_buckets);
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(read_only_repo(), FixedBucketSpaces::global_space()),
+ n_buckets);
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) {
+ getBucketDBUpdater().set_stale_reads_enabled(false);
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:2 .0.s:d storage:4", 0, 0));
+ auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_FALSE(def_rs.is_routable());
+}
+
+
+/*
+ * TODO test
+ * - is_routable for pending/no pending
+ * - explicit guard (how?)
+ * - snapshots for pending/no pending
+ */
+
}
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index 8c701033e67..01a4b72d411 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -7,6 +7,7 @@ vespa_add_library(storage_distributor
bucket_db_prune_elision.cpp
bucketgctimecalculator.cpp
bucketlistmerger.cpp
+ cluster_distribution_context.cpp
clusterinformation.cpp
distributor_bucket_space.cpp
distributor_bucket_space_repo.cpp
@@ -20,6 +21,7 @@ vespa_add_library(storage_distributor
idealstatemetricsset.cpp
messagetracker.cpp
nodeinfo.cpp
+ operation_routing_snapshot.cpp
operation_sequencer.cpp
operationowner.cpp
operationtargetresolver.cpp
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index a901ac28a54..b15bda7b4ca 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -2,6 +2,7 @@
#include "bucketdbupdater.h"
#include "bucket_db_prune_elision.h"
+#include "cluster_distribution_context.h"
#include "distributor.h"
#include "distributor_bucket_space.h"
#include "distributormetricsset.h"
@@ -30,12 +31,43 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner,
: framework::StatusReporter("bucketdb", "Bucket DB Updater"),
_distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"),
_sender(sender),
- _transitionTimer(_distributorComponent.getClock())
+ _transitionTimer(_distributorComponent.getClock()),
+ _active_distribution_contexts(),
+ _distribution_context_mutex()
{
+ for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
+ _active_distribution_contexts.emplace(
+ elem.first,
+ ClusterDistributionContext::make_not_yet_initialized(_distributorComponent.getIndex()));
+ _explicit_transition_read_guard.emplace(elem.first, std::shared_ptr<BucketDatabase::ReadGuard>());
+ }
}
BucketDBUpdater::~BucketDBUpdater() = default;
+OperationRoutingSnapshot BucketDBUpdater::read_snapshot_for_bucket(const document::Bucket& bucket) const {
+ std::lock_guard lock(_distribution_context_mutex);
+ auto active_state_iter = _active_distribution_contexts.find(bucket.getBucketSpace());
+ assert(active_state_iter != _active_distribution_contexts.cend());
+ auto& state = *active_state_iter->second;
+ if (!state.bucket_owned_in_active_state(bucket.getBucketId())) {
+ return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second);
+ }
+ const bool bucket_present_in_mutable_db = state.bucket_owned_in_pending_state(bucket.getBucketId());
+ if (!bucket_present_in_mutable_db && !stale_reads_enabled()) {
+ return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second);
+ }
+ const auto& space = bucket_present_in_mutable_db
+ ? _distributorComponent.getBucketSpaceRepo().get(bucket.getBucketSpace())
+ : _distributorComponent.getReadOnlyBucketSpaceRepo().get(bucket.getBucketSpace());
+ auto existing_guard_iter = _explicit_transition_read_guard.find(bucket.getBucketSpace());
+ assert(existing_guard_iter != _explicit_transition_read_guard.cend());
+ auto db_guard = existing_guard_iter->second
+ ? existing_guard_iter-> second
+ : space.getBucketDatabase().acquire_read_guard();
+ return OperationRoutingSnapshot::make_routable_with_guard(active_state_iter->second, std::move(db_guard));
+}
+
void
BucketDBUpdater::flush()
{
@@ -59,8 +91,7 @@ BucketDBUpdater::print(std::ostream& out, bool verbose, const std::string& inden
bool
BucketDBUpdater::shouldDeferStateEnabling() const noexcept
{
- return _distributorComponent.getDistributor().getConfig()
- .allowStaleReadsDuringClusterStateTransitions();
+ return stale_reads_enabled();
}
bool
@@ -258,6 +289,61 @@ BucketDBUpdater::replyToActivationWithActualVersion(
_distributorComponent.sendUp(reply); // TODO let API accept rvalues
}
+void BucketDBUpdater::update_read_snapshot_before_db_pruning() {
+ std::lock_guard lock(_distribution_context_mutex);
+ for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
+ // At this point, we're still operating with a distribution context _without_ a
+ // pending state, i.e. anyone using the context will expect to find buckets
+ // in the DB that correspond to how the database looked like prior to pruning
+ // buckets from the DB. To ensure this is not violated, take a snapshot of the
+ // _mutable_ DB and expose this. This snapshot only lives until we atomically
+ // flip to expose a distribution context that includes the new, pending state.
+ // At that point, the read-only DB is known to contain the buckets that have
+ // been pruned away, so we can release the mutable DB snapshot safely.
+ // TODO test for, and handle, state preemption case!
+ _explicit_transition_read_guard[elem.first] = elem.second->getBucketDatabase().acquire_read_guard();
+ }
+}
+
+
+void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) {
+ std::lock_guard lock(_distribution_context_mutex);
+ const auto old_baseline_state = _distributorComponent.getBucketSpaceRepo().get(
+ document::FixedBucketSpaces::default_space()).cluster_state_sp();
+ for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
+ auto new_distribution = elem.second->distribution_sp();
+ auto old_cluster_state = elem.second->cluster_state_sp();
+ auto new_cluster_state = new_state.getDerivedClusterState(elem.first);
+ _active_distribution_contexts.insert_or_assign(
+ elem.first,
+ ClusterDistributionContext::make_state_transition(
+ std::move(old_cluster_state),
+ old_baseline_state,
+ std::move(new_cluster_state),
+ std::move(new_distribution),
+ _distributorComponent.getIndex()));
+ // We can now remove the explicit mutable DB snapshot, as the buckets that have been
+ // pruned away are visible in the read-only DB.
+ _explicit_transition_read_guard[elem.first] = std::shared_ptr<BucketDatabase::ReadGuard>();
+ }
+}
+
+void BucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) {
+ std::lock_guard lock(_distribution_context_mutex);
+ const auto& baseline_cluster_state = activated_state.getBaselineClusterState();
+ for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
+ auto new_distribution = elem.second->distribution_sp();
+ auto new_cluster_state = activated_state.getDerivedClusterState(elem.first);
+ _active_distribution_contexts.insert_or_assign(
+ elem.first,
+ ClusterDistributionContext::make_stable_state(
+ std::move(new_cluster_state),
+ baseline_cluster_state,
+ std::move(new_distribution),
+ _distributorComponent.getIndex()));
+ }
+}
+
bool
BucketDBUpdater::onSetSystemState(
const std::shared_ptr<api::SetSystemStateCommand>& cmd)
@@ -275,8 +361,10 @@ BucketDBUpdater::onSetSystemState(
ensureTransitionTimerStarted();
// Separate timer since _transitionTimer might span multiple pending states.
framework::MilliSecTimer process_timer(_distributorComponent.getClock());
-
- removeSuperfluousBuckets(cmd->getClusterStateBundle(), false);
+ update_read_snapshot_before_db_pruning();
+ const auto& bundle = cmd->getClusterStateBundle();
+ removeSuperfluousBuckets(bundle, false);
+ update_read_snapshot_after_db_pruning(bundle);
replyToPreviousPendingClusterStateIfAny();
ClusterInformation::CSP clusterInfo(
@@ -642,6 +730,7 @@ BucketDBUpdater::activatePendingClusterState()
_distributorComponent.getDistributor().notifyDistributionChangeEnabled();
}
+ update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle());
_pendingClusterState.reset();
_outdatedNodesMap.clear();
sendAllQueuedBucketRechecks();
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index e69d328d8bc..7de93774f22 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -6,6 +6,7 @@
#include "distributorcomponent.h"
#include "distributormessagesender.h"
#include "pendingclusterstate.h"
+#include "operation_routing_snapshot.h"
#include "outdated_nodes_map.h"
#include <vespa/document/bucket/bucket.h>
#include <vespa/storageapi/messageapi/returncode.h>
@@ -15,7 +16,9 @@
#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/storageframework/generic/status/statusreporter.h>
#include <vespa/storageapi/messageapi/messagehandler.h>
+#include <atomic>
#include <list>
+#include <mutex>
namespace vespalib::xml {
class XmlOutputStream;
@@ -25,6 +28,7 @@ class XmlAttribute;
namespace storage::distributor {
class Distributor;
+class ClusterDistributionContext;
class BucketDBUpdater : public framework::StatusReporter,
public api::MessageHandler
@@ -70,7 +74,14 @@ public:
return ((_pendingClusterState.get() != nullptr)
&& _pendingClusterState->hasBucketOwnershipTransfer());
}
+ void set_stale_reads_enabled(bool enabled) noexcept {
+ _stale_reads_enabled.store(enabled, std::memory_order_relaxed);
+ }
+ bool stale_reads_enabled() const noexcept {
+ return _stale_reads_enabled.load(std::memory_order_relaxed);
+ }
+ OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const;
private:
DistributorComponent _distributorComponent;
class MergeReplyGuard {
@@ -166,8 +177,11 @@ private:
void updateState(const lib::ClusterState& oldState, const lib::ClusterState& newState);
+ void update_read_snapshot_before_db_pruning();
void removeSuperfluousBuckets(const lib::ClusterStateBundle& newState,
bool is_distribution_config_change);
+ void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state);
+ void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state);
void replyToPreviousPendingClusterStateIfAny();
void replyToActivationWithActualVersion(
@@ -235,6 +249,16 @@ private:
std::set<EnqueuedBucketRecheck> _enqueuedRechecks;
OutdatedNodesMap _outdatedNodesMap;
framework::MilliSecTimer _transitionTimer;
+ std::atomic<bool> _stale_reads_enabled;
+ using DistributionContexts = std::unordered_map<document::BucketSpace,
+ std::shared_ptr<ClusterDistributionContext>,
+ document::BucketSpace::hash>;
+ DistributionContexts _active_distribution_contexts;
+ using DbGuards = std::unordered_map<document::BucketSpace,
+ std::shared_ptr<BucketDatabase::ReadGuard>,
+ document::BucketSpace::hash>;
+ DbGuards _explicit_transition_read_guard;
+ mutable std::mutex _distribution_context_mutex;
};
}
diff --git a/storage/src/vespa/storage/distributor/cluster_distribution_context.cpp b/storage/src/vespa/storage/distributor/cluster_distribution_context.cpp
new file mode 100644
index 00000000000..6ec7395c3f5
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/cluster_distribution_context.cpp
@@ -0,0 +1,79 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "cluster_distribution_context.h"
+
+namespace storage::distributor {
+
+ClusterDistributionContext::~ClusterDistributionContext() = default;
+
+ClusterDistributionContext::ClusterDistributionContext(
+ std::shared_ptr<const lib::ClusterState> active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> pending_cluster_state,
+ std::shared_ptr<const lib::Distribution> distribution,
+ uint16_t this_node_index)
+ : _active_cluster_state(std::move(active_cluster_state)),
+ _baseline_active_cluster_state(std::move(baseline_active_cluster_state)),
+ _pending_cluster_state(std::move(pending_cluster_state)),
+ _distribution(std::move(distribution)),
+ _this_node_index(this_node_index)
+{}
+
+std::shared_ptr<ClusterDistributionContext> ClusterDistributionContext::make_state_transition(
+ std::shared_ptr<const lib::ClusterState> active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> pending_cluster_state,
+ std::shared_ptr<const lib::Distribution> distribution,
+ uint16_t this_node_index)
+{
+ return std::make_shared<ClusterDistributionContext>(
+ std::move(active_cluster_state), std::move(baseline_active_cluster_state),
+ std::move(pending_cluster_state), std::move(distribution),
+ this_node_index);
+}
+
+std::shared_ptr<ClusterDistributionContext> ClusterDistributionContext::make_stable_state(
+ std::shared_ptr<const lib::ClusterState> active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state,
+ std::shared_ptr<const lib::Distribution> distribution,
+ uint16_t this_node_index)
+{
+ return std::make_shared<ClusterDistributionContext>(
+ std::move(active_cluster_state), std::move(baseline_active_cluster_state),
+ std::shared_ptr<const lib::ClusterState>(),
+ std::move(distribution), this_node_index);
+}
+
+std::shared_ptr<ClusterDistributionContext> ClusterDistributionContext::make_not_yet_initialized(uint16_t this_node_index) {
+ return std::make_shared<ClusterDistributionContext>(
+ std::make_shared<const lib::ClusterState>(),
+ std::make_shared<const lib::ClusterState>(),
+ std::shared_ptr<const lib::ClusterState>(),
+ std::make_shared<const lib::Distribution>(),
+ this_node_index);
+}
+
+bool ClusterDistributionContext::bucket_owned_in_state(const lib::ClusterState& state,
+ const document::BucketId& id) const
+{
+ try {
+ uint16_t owner_idx = _distribution->getIdealDistributorNode(state, id);
+ return (owner_idx == _this_node_index);
+ } catch (lib::TooFewBucketBitsInUseException&) {
+ return false;
+ } catch (lib::NoDistributorsAvailableException&) {
+ return false;
+ }
+}
+
+bool ClusterDistributionContext::bucket_owned_in_active_state(const document::BucketId& id) const {
+ return bucket_owned_in_state(*_active_cluster_state, id);
+}
+
+bool ClusterDistributionContext::bucket_owned_in_pending_state(const document::BucketId& id) const {
+ if (_pending_cluster_state) {
+ return bucket_owned_in_state(*_pending_cluster_state, id);
+ }
+ return true; // No pending state, owned by default.
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/cluster_distribution_context.h b/storage/src/vespa/storage/distributor/cluster_distribution_context.h
new file mode 100644
index 00000000000..f56a3f37cfa
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/cluster_distribution_context.h
@@ -0,0 +1,67 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <memory>
+#include <cstdint>
+
+namespace storage::distributor {
+
+/**
+ * TODO desc
+ * TODO rename to be bucket space specific?
+ */
+class ClusterDistributionContext {
+ std::shared_ptr<const lib::ClusterState> _active_cluster_state;
+ std::shared_ptr<const lib::ClusterState> _baseline_active_cluster_state;
+ std::shared_ptr<const lib::ClusterState> _pending_cluster_state; // May be null if no state is pending
+ std::shared_ptr<const lib::Distribution> _distribution; // TODO ideally should have a pending distribution as well
+ uint16_t _this_node_index;
+public:
+ // Public due to make_shared, prefer factory functions instead.
+ ClusterDistributionContext(std::shared_ptr<const lib::ClusterState> active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> pending_cluster_state,
+ std::shared_ptr<const lib::Distribution> distribution,
+ uint16_t this_node_index);
+
+ ClusterDistributionContext() = delete;
+ ~ClusterDistributionContext();
+
+ static std::shared_ptr<ClusterDistributionContext> make_state_transition(
+ std::shared_ptr<const lib::ClusterState> active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> pending_cluster_state,
+ std::shared_ptr<const lib::Distribution> distribution,
+ uint16_t this_node_index);
+ static std::shared_ptr<ClusterDistributionContext> make_stable_state(
+ std::shared_ptr<const lib::ClusterState> active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state,
+ std::shared_ptr<const lib::Distribution> distribution,
+ uint16_t this_node_index);
+ static std::shared_ptr<ClusterDistributionContext> make_not_yet_initialized(uint16_t this_node_index);
+
+ const std::shared_ptr<const lib::ClusterState>& active_cluster_state() const noexcept {
+ return _active_cluster_state;
+ }
+
+ const std::shared_ptr<const lib::ClusterState>& baseline_active_cluster_state() const noexcept {
+ return _baseline_active_cluster_state;
+ }
+ bool has_pending_state_transition() const noexcept {
+ return (_pending_cluster_state.get() != nullptr);
+ }
+ // Returned shared_ptr is nullptr iff has_pending_state_transition() == false.
+ const std::shared_ptr<const lib::ClusterState>& pending_cluster_state() const noexcept {
+ return _pending_cluster_state;
+ }
+
+ bool bucket_owned_in_state(const lib::ClusterState& state, const document::BucketId& id) const;
+ bool bucket_owned_in_active_state(const document::BucketId& id) const;
+ bool bucket_owned_in_pending_state(const document::BucketId& id) const;
+
+ uint16_t this_node_index() const noexcept { return _this_node_index; }
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 69b64ac8dc1..5e2cd42bb46 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -834,6 +834,7 @@ Distributor::enableNextConfig()
_bucketDBMetricUpdater.setMinimumReplicaCountingMode(getConfig().getMinimumReplicaCountingMode());
_ownershipSafeTimeCalc->setMaxClusterClockSkew(getConfig().getMaxClusterClockSkew());
_pendingMessageTracker.setNodeBusyDuration(getConfig().getInhibitMergesOnBusyNodeDuration());
+ _bucketDBUpdater.set_stale_reads_enabled(getConfig().allowStaleReadsDuringClusterStateTransitions());
}
void
diff --git a/storage/src/vespa/storage/distributor/distributor_bucket_space.h b/storage/src/vespa/storage/distributor/distributor_bucket_space.h
index 26a0ee9098c..8fbb99dfe89 100644
--- a/storage/src/vespa/storage/distributor/distributor_bucket_space.h
+++ b/storage/src/vespa/storage/distributor/distributor_bucket_space.h
@@ -48,6 +48,9 @@ public:
void setClusterState(std::shared_ptr<const lib::ClusterState> clusterState);
const lib::ClusterState &getClusterState() const noexcept { return *_clusterState; }
+ const std::shared_ptr<const lib::ClusterState>& cluster_state_sp() const noexcept {
+ return _clusterState;
+ }
void setDistribution(std::shared_ptr<const lib::Distribution> distribution);
@@ -55,6 +58,9 @@ public:
const lib::Distribution& getDistribution() const noexcept {
return *_distribution;
}
+ const std::shared_ptr<const lib::Distribution>& distribution_sp() const noexcept {
+ return _distribution;
+ }
};
diff --git a/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp b/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp
new file mode 100644
index 00000000000..8300f715cac
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp
@@ -0,0 +1,27 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "operation_routing_snapshot.h"
+
+namespace storage::distributor {
+
+OperationRoutingSnapshot::OperationRoutingSnapshot(std::shared_ptr<ClusterDistributionContext> context,
+ std::shared_ptr<BucketDatabase::ReadGuard> read_guard)
+ : _context(std::move(context)),
+ _read_guard(std::move(read_guard))
+{}
+
+OperationRoutingSnapshot::~OperationRoutingSnapshot() = default;
+
+OperationRoutingSnapshot OperationRoutingSnapshot::make_not_routable_in_state(
+ std::shared_ptr<ClusterDistributionContext> context)
+{
+ return OperationRoutingSnapshot(std::move(context), std::shared_ptr<BucketDatabase::ReadGuard>());
+}
+
+OperationRoutingSnapshot OperationRoutingSnapshot::make_routable_with_guard(
+ std::shared_ptr<ClusterDistributionContext> context,
+ std::shared_ptr<BucketDatabase::ReadGuard> read_guard)
+{
+ return OperationRoutingSnapshot(std::move(context), std::move(read_guard));
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/operation_routing_snapshot.h b/storage/src/vespa/storage/distributor/operation_routing_snapshot.h
new file mode 100644
index 00000000000..0bd8c5d928e
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operation_routing_snapshot.h
@@ -0,0 +1,41 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/storage/bucketdb/bucketdatabase.h>
+#include <memory>
+
+namespace storage::distributor {
+
+class ClusterDistributionContext;
+
+/**
+ * TODO desc
+ */
+class OperationRoutingSnapshot {
+ std::shared_ptr<ClusterDistributionContext> _context;
+ std::shared_ptr<BucketDatabase::ReadGuard> _read_guard;
+public:
+ OperationRoutingSnapshot(std::shared_ptr<ClusterDistributionContext> context,
+ std::shared_ptr<BucketDatabase::ReadGuard> read_guard);
+
+ static OperationRoutingSnapshot make_not_routable_in_state(std::shared_ptr<ClusterDistributionContext> context);
+ static OperationRoutingSnapshot make_routable_with_guard(std::shared_ptr<ClusterDistributionContext> context,
+ std::shared_ptr<BucketDatabase::ReadGuard> read_guard);
+
+ OperationRoutingSnapshot(const OperationRoutingSnapshot&) noexcept = default;
+ OperationRoutingSnapshot& operator=(const OperationRoutingSnapshot&) noexcept = default;
+ OperationRoutingSnapshot(OperationRoutingSnapshot&&) noexcept = default;
+ OperationRoutingSnapshot& operator=(OperationRoutingSnapshot&&) noexcept = default;
+
+ ~OperationRoutingSnapshot();
+
+ const ClusterDistributionContext& context() const noexcept { return *_context; }
+ std::shared_ptr<BucketDatabase::ReadGuard> steal_read_guard() noexcept {
+ return std::move(_read_guard);
+ }
+ bool is_routable() const noexcept {
+ return (_read_guard.get() != nullptr);
+ }
+};
+
+}