summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-10-02 14:58:16 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-10-03 09:14:48 +0000
commitb2c66836548ffa4aabc380195026fdbd844d40e7 (patch)
tree7dd2be2a23ab7c4384ecdec02b26cb4f2e651c9a /storage
parente368bcb60712305139377a97196b68f664aa2164 (diff)
Rewrite Get operation starting to use explicit snapshotting
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp24
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp3
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt2
-rw-r--r--storage/src/vespa/storage/distributor/bucket_space_distribution_context.cpp (renamed from storage/src/vespa/storage/distributor/cluster_distribution_context.cpp)40
-rw-r--r--storage/src/vespa/storage/distributor/bucket_space_distribution_context.h (renamed from storage/src/vespa/storage/distributor/cluster_distribution_context.h)43
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp36
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h13
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp11
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributorinterface.h3
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp50
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.h9
-rw-r--r--storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp19
-rw-r--r--storage/src/vespa/storage/distributor/operation_routing_snapshot.h37
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.h4
17 files changed, 185 insertions, 114 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 30b5256cf13..8409bd60986 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -2,7 +2,7 @@
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storage/distributor/bucketdbupdater.h>
-#include <vespa/storage/distributor/cluster_distribution_context.h>
+#include <vespa/storage/distributor/bucket_space_distribution_context.h>
#include <vespa/storage/distributor/distributormetricsset.h>
#include <vespa/storage/distributor/pending_bucket_space_db_transition.h>
#include <vespa/storage/distributor/outdated_nodes_map.h>
@@ -2758,11 +2758,11 @@ struct BucketDBUpdaterSnapshotTest : BucketDBUpdaterTest {
// Assumes that the distributor owns all buckets, so it may choose any arbitrary bucket in the bucket space
uint32_t buckets_in_snapshot_matching_current_db(DistributorBucketSpaceRepo& repo, BucketSpace bucket_space) {
- auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(Bucket(bucket_space, BucketId(16, 1234)));
- if (!def_rs.is_routable()) {
+ auto rs = getBucketDBUpdater().read_snapshot_for_bucket(Bucket(bucket_space, BucketId(16, 1234)));
+ if (!rs.is_routable()) {
return 0;
}
- auto guard = def_rs.steal_read_guard();
+ auto guard = rs.steal_read_guard();
uint32_t found_buckets = 0;
for_each_bucket(repo, [&](const auto& space, const auto& entry) {
if (space == bucket_space) {
@@ -2795,13 +2795,13 @@ TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_st
auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), empty_state.toString());
- EXPECT_EQ(def_rs.context().baseline_active_cluster_state()->toString(), empty_state.toString());
+ EXPECT_EQ(def_rs.context().default_active_cluster_state()->toString(), empty_state.toString());
ASSERT_TRUE(def_rs.context().has_pending_state_transition());
EXPECT_EQ(def_rs.context().pending_cluster_state()->toString(), initial_default->toString());
auto global_rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket);
EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), empty_state.toString());
- EXPECT_EQ(global_rs.context().baseline_active_cluster_state()->toString(), empty_state.toString());
+ EXPECT_EQ(global_rs.context().default_active_cluster_state()->toString(), empty_state.toString());
ASSERT_TRUE(global_rs.context().has_pending_state_transition());
EXPECT_EQ(global_rs.context().pending_cluster_state()->toString(), initial_baseline->toString());
@@ -2810,12 +2810,12 @@ TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_st
def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), initial_default->toString());
- EXPECT_EQ(def_rs.context().baseline_active_cluster_state()->toString(), initial_baseline->toString());
+ EXPECT_EQ(def_rs.context().default_active_cluster_state()->toString(), initial_default->toString());
EXPECT_FALSE(def_rs.context().has_pending_state_transition());
global_rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket);
EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), initial_baseline->toString());
- EXPECT_EQ(global_rs.context().baseline_active_cluster_state()->toString(), initial_baseline->toString());
+ EXPECT_EQ(global_rs.context().default_active_cluster_state()->toString(), initial_default->toString());
EXPECT_FALSE(global_rs.context().has_pending_state_transition());
}
@@ -2862,12 +2862,4 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabl
EXPECT_FALSE(def_rs.is_routable());
}
-
-/*
- * TODO test
- * - is_routable for pending/no pending
- * - explicit guard (how?)
- * - snapshots for pending/no pending
- */
-
}
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index 4a9ef147741..15820b64ff9 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -417,7 +417,8 @@ DistributorTestUtil::getBucketSpaces() const
void
DistributorTestUtil::enableDistributorClusterState(vespalib::stringref state)
{
- _distributor->enableClusterStateBundle(lib::ClusterStateBundle(lib::ClusterState(state)));
+ getBucketDBUpdater().simulate_cluster_state_bundle_activation(
+ lib::ClusterStateBundle(lib::ClusterState(state)));
}
}
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index 600e56faf31..84f7d34d069 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -505,6 +505,7 @@ document::BucketId ExternalOperationHandlerTest::set_up_pending_cluster_state_tr
std::string current = "version:123 distributor:2 storage:2";
std::string pending = "version:321 distributor:3 storage:3";
setupDistributor(1, 3, current);
+ getBucketDBUpdater().set_stale_reads_enabled(read_only_enabled);
getConfig().setAllowStaleReadsDuringClusterStateTransitions(read_only_enabled);
// Trigger pending cluster state
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index 01a4b72d411..944df6e1708 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -7,7 +7,7 @@ vespa_add_library(storage_distributor
bucket_db_prune_elision.cpp
bucketgctimecalculator.cpp
bucketlistmerger.cpp
- cluster_distribution_context.cpp
+ bucket_space_distribution_context.cpp
clusterinformation.cpp
distributor_bucket_space.cpp
distributor_bucket_space_repo.cpp
diff --git a/storage/src/vespa/storage/distributor/cluster_distribution_context.cpp b/storage/src/vespa/storage/distributor/bucket_space_distribution_context.cpp
index 6ec7395c3f5..53040bc42b1 100644
--- a/storage/src/vespa/storage/distributor/cluster_distribution_context.cpp
+++ b/storage/src/vespa/storage/distributor/bucket_space_distribution_context.cpp
@@ -1,50 +1,52 @@
// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "cluster_distribution_context.h"
+#include "bucket_space_distribution_context.h"
namespace storage::distributor {
-ClusterDistributionContext::~ClusterDistributionContext() = default;
+BucketSpaceDistributionContext::~BucketSpaceDistributionContext() = default;
-ClusterDistributionContext::ClusterDistributionContext(
+BucketSpaceDistributionContext::BucketSpaceDistributionContext(
std::shared_ptr<const lib::ClusterState> active_cluster_state,
- std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> default_active_cluster_state,
std::shared_ptr<const lib::ClusterState> pending_cluster_state,
std::shared_ptr<const lib::Distribution> distribution,
uint16_t this_node_index)
: _active_cluster_state(std::move(active_cluster_state)),
- _baseline_active_cluster_state(std::move(baseline_active_cluster_state)),
+ _default_active_cluster_state(std::move(default_active_cluster_state)),
_pending_cluster_state(std::move(pending_cluster_state)),
_distribution(std::move(distribution)),
_this_node_index(this_node_index)
{}
-std::shared_ptr<ClusterDistributionContext> ClusterDistributionContext::make_state_transition(
+std::shared_ptr<BucketSpaceDistributionContext> BucketSpaceDistributionContext::make_state_transition(
std::shared_ptr<const lib::ClusterState> active_cluster_state,
- std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> default_active_cluster_state,
std::shared_ptr<const lib::ClusterState> pending_cluster_state,
std::shared_ptr<const lib::Distribution> distribution,
uint16_t this_node_index)
{
- return std::make_shared<ClusterDistributionContext>(
- std::move(active_cluster_state), std::move(baseline_active_cluster_state),
+ return std::make_shared<BucketSpaceDistributionContext>(
+ std::move(active_cluster_state), std::move(default_active_cluster_state),
std::move(pending_cluster_state), std::move(distribution),
this_node_index);
}
-std::shared_ptr<ClusterDistributionContext> ClusterDistributionContext::make_stable_state(
+std::shared_ptr<BucketSpaceDistributionContext> BucketSpaceDistributionContext::make_stable_state(
std::shared_ptr<const lib::ClusterState> active_cluster_state,
- std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> default_active_cluster_state,
std::shared_ptr<const lib::Distribution> distribution,
uint16_t this_node_index)
{
- return std::make_shared<ClusterDistributionContext>(
- std::move(active_cluster_state), std::move(baseline_active_cluster_state),
+ return std::make_shared<BucketSpaceDistributionContext>(
+ std::move(active_cluster_state), std::move(default_active_cluster_state),
std::shared_ptr<const lib::ClusterState>(),
std::move(distribution), this_node_index);
}
-std::shared_ptr<ClusterDistributionContext> ClusterDistributionContext::make_not_yet_initialized(uint16_t this_node_index) {
- return std::make_shared<ClusterDistributionContext>(
+std::shared_ptr<BucketSpaceDistributionContext>
+BucketSpaceDistributionContext::make_not_yet_initialized(uint16_t this_node_index)
+{
+ return std::make_shared<BucketSpaceDistributionContext>(
std::make_shared<const lib::ClusterState>(),
std::make_shared<const lib::ClusterState>(),
std::shared_ptr<const lib::ClusterState>(),
@@ -52,8 +54,8 @@ std::shared_ptr<ClusterDistributionContext> ClusterDistributionContext::make_not
this_node_index);
}
-bool ClusterDistributionContext::bucket_owned_in_state(const lib::ClusterState& state,
- const document::BucketId& id) const
+bool BucketSpaceDistributionContext::bucket_owned_in_state(const lib::ClusterState& state,
+ const document::BucketId& id) const
{
try {
uint16_t owner_idx = _distribution->getIdealDistributorNode(state, id);
@@ -65,11 +67,11 @@ bool ClusterDistributionContext::bucket_owned_in_state(const lib::ClusterState&
}
}
-bool ClusterDistributionContext::bucket_owned_in_active_state(const document::BucketId& id) const {
+bool BucketSpaceDistributionContext::bucket_owned_in_active_state(const document::BucketId& id) const {
return bucket_owned_in_state(*_active_cluster_state, id);
}
-bool ClusterDistributionContext::bucket_owned_in_pending_state(const document::BucketId& id) const {
+bool BucketSpaceDistributionContext::bucket_owned_in_pending_state(const document::BucketId& id) const {
if (_pending_cluster_state) {
return bucket_owned_in_state(*_pending_cluster_state, id);
}
diff --git a/storage/src/vespa/storage/distributor/cluster_distribution_context.h b/storage/src/vespa/storage/distributor/bucket_space_distribution_context.h
index f56a3f37cfa..7a9c0fcae60 100644
--- a/storage/src/vespa/storage/distributor/cluster_distribution_context.h
+++ b/storage/src/vespa/storage/distributor/bucket_space_distribution_context.h
@@ -9,45 +9,48 @@
namespace storage::distributor {
/**
- * TODO desc
- * TODO rename to be bucket space specific?
+ * Represents a consistent snapshot of cluster state and distribution config
+ * information at a particular point in time. This is sufficient to compute
+ * bucket ownership and distributions for the bucket space associated with
+ * the context.
+ *
+ * Since this is a snapshot in time, the context is immutable once created.
*/
-class ClusterDistributionContext {
+class BucketSpaceDistributionContext {
std::shared_ptr<const lib::ClusterState> _active_cluster_state;
- std::shared_ptr<const lib::ClusterState> _baseline_active_cluster_state;
+ std::shared_ptr<const lib::ClusterState> _default_active_cluster_state;
std::shared_ptr<const lib::ClusterState> _pending_cluster_state; // May be null if no state is pending
std::shared_ptr<const lib::Distribution> _distribution; // TODO ideally should have a pending distribution as well
uint16_t _this_node_index;
public:
- // Public due to make_shared, prefer factory functions instead.
- ClusterDistributionContext(std::shared_ptr<const lib::ClusterState> active_cluster_state,
- std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state,
- std::shared_ptr<const lib::ClusterState> pending_cluster_state,
- std::shared_ptr<const lib::Distribution> distribution,
- uint16_t this_node_index);
+ BucketSpaceDistributionContext() = delete;
+ // Public due to make_shared, prefer factory functions to instantiate instead.
+ BucketSpaceDistributionContext(std::shared_ptr<const lib::ClusterState> active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> default_active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> pending_cluster_state,
+ std::shared_ptr<const lib::Distribution> distribution,
+ uint16_t this_node_index);
+ ~BucketSpaceDistributionContext();
- ClusterDistributionContext() = delete;
- ~ClusterDistributionContext();
-
- static std::shared_ptr<ClusterDistributionContext> make_state_transition(
+ static std::shared_ptr<BucketSpaceDistributionContext> make_state_transition(
std::shared_ptr<const lib::ClusterState> active_cluster_state,
- std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> default_active_cluster_state,
std::shared_ptr<const lib::ClusterState> pending_cluster_state,
std::shared_ptr<const lib::Distribution> distribution,
uint16_t this_node_index);
- static std::shared_ptr<ClusterDistributionContext> make_stable_state(
+ static std::shared_ptr<BucketSpaceDistributionContext> make_stable_state(
std::shared_ptr<const lib::ClusterState> active_cluster_state,
- std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> default_active_cluster_state,
std::shared_ptr<const lib::Distribution> distribution,
uint16_t this_node_index);
- static std::shared_ptr<ClusterDistributionContext> make_not_yet_initialized(uint16_t this_node_index);
+ static std::shared_ptr<BucketSpaceDistributionContext> make_not_yet_initialized(uint16_t this_node_index);
const std::shared_ptr<const lib::ClusterState>& active_cluster_state() const noexcept {
return _active_cluster_state;
}
- const std::shared_ptr<const lib::ClusterState>& baseline_active_cluster_state() const noexcept {
- return _baseline_active_cluster_state;
+ const std::shared_ptr<const lib::ClusterState>& default_active_cluster_state() const noexcept {
+ return _default_active_cluster_state;
}
bool has_pending_state_transition() const noexcept {
return (_pending_cluster_state.get() != nullptr);
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index b15bda7b4ca..227165a0911 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -2,7 +2,7 @@
#include "bucketdbupdater.h"
#include "bucket_db_prune_elision.h"
-#include "cluster_distribution_context.h"
+#include "bucket_space_distribution_context.h"
#include "distributor.h"
#include "distributor_bucket_space.h"
#include "distributormetricsset.h"
@@ -38,7 +38,7 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner,
for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
_active_distribution_contexts.emplace(
elem.first,
- ClusterDistributionContext::make_not_yet_initialized(_distributorComponent.getIndex()));
+ BucketSpaceDistributionContext::make_not_yet_initialized(_distributorComponent.getIndex()));
_explicit_transition_read_guard.emplace(elem.first, std::shared_ptr<BucketDatabase::ReadGuard>());
}
}
@@ -46,8 +46,9 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner,
BucketDBUpdater::~BucketDBUpdater() = default;
OperationRoutingSnapshot BucketDBUpdater::read_snapshot_for_bucket(const document::Bucket& bucket) const {
+ const auto bucket_space = bucket.getBucketSpace();
std::lock_guard lock(_distribution_context_mutex);
- auto active_state_iter = _active_distribution_contexts.find(bucket.getBucketSpace());
+ auto active_state_iter = _active_distribution_contexts.find(bucket_space);
assert(active_state_iter != _active_distribution_contexts.cend());
auto& state = *active_state_iter->second;
if (!state.bucket_owned_in_active_state(bucket.getBucketId())) {
@@ -57,15 +58,15 @@ OperationRoutingSnapshot BucketDBUpdater::read_snapshot_for_bucket(const documen
if (!bucket_present_in_mutable_db && !stale_reads_enabled()) {
return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second);
}
- const auto& space = bucket_present_in_mutable_db
- ? _distributorComponent.getBucketSpaceRepo().get(bucket.getBucketSpace())
- : _distributorComponent.getReadOnlyBucketSpaceRepo().get(bucket.getBucketSpace());
- auto existing_guard_iter = _explicit_transition_read_guard.find(bucket.getBucketSpace());
+ const auto& space_repo = bucket_present_in_mutable_db
+ ? _distributorComponent.getBucketSpaceRepo()
+ : _distributorComponent.getReadOnlyBucketSpaceRepo();
+ auto existing_guard_iter = _explicit_transition_read_guard.find(bucket_space);
assert(existing_guard_iter != _explicit_transition_read_guard.cend());
auto db_guard = existing_guard_iter->second
? existing_guard_iter-> second
- : space.getBucketDatabase().acquire_read_guard();
- return OperationRoutingSnapshot::make_routable_with_guard(active_state_iter->second, std::move(db_guard));
+ : space_repo.get(bucket_space).getBucketDatabase().acquire_read_guard();
+ return OperationRoutingSnapshot::make_routable_with_guard(active_state_iter->second, std::move(db_guard), space_repo);
}
void
@@ -308,7 +309,7 @@ void BucketDBUpdater::update_read_snapshot_before_db_pruning() {
void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) {
std::lock_guard lock(_distribution_context_mutex);
- const auto old_baseline_state = _distributorComponent.getBucketSpaceRepo().get(
+ const auto old_default_state = _distributorComponent.getBucketSpaceRepo().get(
document::FixedBucketSpaces::default_space()).cluster_state_sp();
for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
auto new_distribution = elem.second->distribution_sp();
@@ -316,9 +317,9 @@ void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterSt
auto new_cluster_state = new_state.getDerivedClusterState(elem.first);
_active_distribution_contexts.insert_or_assign(
elem.first,
- ClusterDistributionContext::make_state_transition(
+ BucketSpaceDistributionContext::make_state_transition(
std::move(old_cluster_state),
- old_baseline_state,
+ old_default_state,
std::move(new_cluster_state),
std::move(new_distribution),
_distributorComponent.getIndex()));
@@ -330,15 +331,15 @@ void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterSt
void BucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) {
std::lock_guard lock(_distribution_context_mutex);
- const auto& baseline_cluster_state = activated_state.getBaselineClusterState();
+ const auto& default_cluster_state = activated_state.getDerivedClusterState(document::FixedBucketSpaces::default_space());
for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
auto new_distribution = elem.second->distribution_sp();
auto new_cluster_state = activated_state.getDerivedClusterState(elem.first);
_active_distribution_contexts.insert_or_assign(
elem.first,
- ClusterDistributionContext::make_stable_state(
+ BucketSpaceDistributionContext::make_stable_state(
std::move(new_cluster_state),
- baseline_cluster_state,
+ default_cluster_state,
std::move(new_distribution),
_distributorComponent.getIndex()));
}
@@ -754,6 +755,11 @@ BucketDBUpdater::enableCurrentClusterStateBundleInDistributor()
_distributorComponent.getDistributor().enableClusterStateBundle(state);
}
+void BucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state) {
+ update_read_snapshot_after_activation(activated_state);
+ _distributorComponent.getDistributor().enableClusterStateBundle(activated_state);
+}
+
void
BucketDBUpdater::addCurrentStateToClusterStateHistory()
{
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 7de93774f22..86ceab14486 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -28,7 +28,7 @@ class XmlAttribute;
namespace storage::distributor {
class Distributor;
-class ClusterDistributionContext;
+class BucketSpaceDistributionContext;
class BucketDBUpdater : public framework::StatusReporter,
public api::MessageHandler
@@ -140,6 +140,12 @@ private:
}
};
+ friend class DistributorTestUtil;
+ // Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor
+ // components agree on the currently active cluster state bundle.
+ // Transitively invokes Distributor::enableClusterStateBundle
+ void simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state);
+
bool shouldDeferStateEnabling() const noexcept;
bool hasPendingClusterState() const;
bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
@@ -196,9 +202,6 @@ private:
void maybe_inject_simulated_db_pruning_delay();
void maybe_inject_simulated_db_merging_delay();
- friend class BucketDBUpdater_Test;
- friend class MergeOperation_Test;
-
/**
Removes all copies of buckets that are on nodes that are down.
*/
@@ -251,7 +254,7 @@ private:
framework::MilliSecTimer _transitionTimer;
std::atomic<bool> _stale_reads_enabled;
using DistributionContexts = std::unordered_map<document::BucketSpace,
- std::shared_ptr<ClusterDistributionContext>,
+ std::shared_ptr<BucketSpaceDistributionContext>,
document::BucketSpace::hash>;
DistributionContexts _active_distribution_contexts;
using DbGuards = std::unordered_map<document::BucketSpace,
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 5e2cd42bb46..4adbdd32669 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -77,7 +77,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_distributorStatusDelegate(compReg, *this, *this),
_bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater),
_idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, manageActiveBucketCopies),
- _externalOperationHandler(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, _idealStateManager, compReg),
+ _externalOperationHandler(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo,
+ _idealStateManager, compReg, use_btree_database),
_threadPool(threadPool),
_initializingIsUp(true),
_doneInitializeHandler(doneInitHandler),
@@ -322,9 +323,7 @@ bool
Distributor::handleMessage(const std::shared_ptr<api::StorageMessage>& msg)
{
if (msg->getType().isReply()) {
- std::shared_ptr<api::StorageReply> reply =
- std::dynamic_pointer_cast<api::StorageReply>(msg);
-
+ auto reply = std::dynamic_pointer_cast<api::StorageReply>(msg);
if (handleReply(reply)) {
return true;
}
@@ -400,6 +399,10 @@ Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state)
}
}
+OperationRoutingSnapshot Distributor::read_snapshot_for_bucket(const document::Bucket& bucket) const {
+ return _bucketDBUpdater.read_snapshot_for_bucket(bucket);
+}
+
void
Distributor::notifyDistributionChangeEnabled()
{
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 638704adf24..48d9145eec7 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -170,6 +170,8 @@ public:
return *_readOnlyBucketSpaceRepo;
}
+ OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const override;
+
class Status;
class MetricUpdateHook : public framework::MetricUpdateHook
{
diff --git a/storage/src/vespa/storage/distributor/distributorinterface.h b/storage/src/vespa/storage/distributor/distributorinterface.h
index d9f037bb8f1..aba58e112dc 100644
--- a/storage/src/vespa/storage/distributor/distributorinterface.h
+++ b/storage/src/vespa/storage/distributor/distributorinterface.h
@@ -4,6 +4,7 @@
#include "bucketgctimecalculator.h"
#include "distributormessagesender.h"
#include "bucketownership.h"
+#include "operation_routing_snapshot.h"
#include <vespa/storage/bucketdb/bucketdatabase.h>
#include <vespa/document/bucket/bucket.h>
@@ -49,6 +50,8 @@ public:
*/
virtual const lib::ClusterStateBundle& getClusterStateBundle() const = 0;
+ virtual OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const = 0;
+
/**
* Returns true if the node is currently initializing.
*/
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index f1f786d84ac..221c516a56e 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -1,5 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "bucket_space_distribution_context.h"
#include "externaloperationhandler.h"
#include "distributor.h"
#include <vespa/document/base/documentid.h>
@@ -12,7 +13,6 @@
#include <vespa/storage/distributor/operations/external/statbucketlistoperation.h>
#include <vespa/storage/distributor/operations/external/removelocationoperation.h>
#include <vespa/storage/distributor/operations/external/visitoroperation.h>
-#include <vespa/document/util/stringutil.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/storageapi/message/stat.h>
@@ -30,11 +30,16 @@ ExternalOperationHandler::ExternalOperationHandler(Distributor& owner,
DistributorBucketSpaceRepo& bucketSpaceRepo,
DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
const MaintenanceOperationGenerator& gen,
- DistributorComponentRegister& compReg)
+ DistributorComponentRegister& compReg,
+ bool enable_concurrent_gets)
: DistributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "External operation handler"),
_operationGenerator(gen),
- _rejectFeedBeforeTimeReached() // At epoch
-{ }
+ _rejectFeedBeforeTimeReached(), // At epoch
+ _non_main_thread_ops_mutex(),
+ _non_main_thread_ops_owner(owner, getClock()),
+ _enable_concurrent_gets(enable_concurrent_gets)
+{
+}
ExternalOperationHandler::~ExternalOperationHandler() = default;
@@ -78,24 +83,32 @@ void ExternalOperationHandler::bounce_with_result(api::StorageCommand& cmd, cons
sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
}
-void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd) {
+void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd,
+ const lib::ClusterState& cluster_state)
+{
// Distributor ownership is equal across bucket spaces, so always send back default space state.
// This also helps client avoid getting confused by possibly observing different actual
// (derived) state strings for global/non-global document types for the same state version.
// Similarly, if we've yet to activate any version at all we send back BUSY instead
// of a suspiciously empty WrongDistributionReply.
// TOOD consider NOT_READY instead of BUSY once we're sure this won't cause any other issues.
- const auto& cluster_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState();
if (cluster_state.getVersion() != 0) {
auto cluster_state_str = cluster_state.toString();
- LOG(debug, "Got message with wrong distribution, sending back state '%s'", cluster_state_str.c_str());
+ LOG(debug, "Got %s with wrong distribution, sending back state '%s'",
+ cmd.toString().c_str(), cluster_state_str.c_str());
bounce_with_result(cmd, api::ReturnCode(api::ReturnCode::WRONG_DISTRIBUTION, cluster_state_str));
} else { // Only valid for empty startup state
- LOG(debug, "Got message with wrong distribution, but no cluster state activated yet. Sending back BUSY");
+ LOG(debug, "Got %s with wrong distribution, but no cluster state activated yet. Sending back BUSY",
+ cmd.toString().c_str());
bounce_with_result(cmd, api::ReturnCode(api::ReturnCode::BUSY, "No cluster state activated yet"));
}
}
+void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd) {
+ const auto& cluster_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState();
+ bounce_with_wrong_distribution(cmd, cluster_state);
+}
+
void ExternalOperationHandler::bounce_with_busy_during_state_transition(
api::StorageCommand& cmd,
const lib::ClusterState& current_state,
@@ -283,10 +296,23 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get)
{
document::Bucket bucket(cmd->getBucket().getBucketSpace(), getBucketId(cmd->getDocumentId()));
auto& metrics = getMetrics().gets[cmd->getLoadType()];
- bounce_or_invoke_read_only_op(*cmd, bucket, metrics, [&](auto& bucket_space_repo) {
- auto& space = bucket_space_repo.get(cmd->getBucket().getBucketSpace());
- _op = std::make_shared<GetOperation>(*this, space, space.getBucketDatabase().acquire_read_guard(), cmd, metrics);
- });
+ auto snapshot = getDistributor().read_snapshot_for_bucket(bucket);
+ if (!snapshot.is_routable()) {
+ const auto& ctx = snapshot.context();
+ if (ctx.has_pending_state_transition()) {
+ bounce_with_busy_during_state_transition(*cmd, *ctx.default_active_cluster_state(),
+ *ctx.pending_cluster_state());
+ } else {
+ bounce_with_wrong_distribution(*cmd, *snapshot.context().default_active_cluster_state());
+ metrics.failures.wrongdistributor.inc(); // TODO thread safety for updates
+ }
+ return true;
+ }
+ // The snapshot is aware of whether stale reads are enabled, so we don't have to check that here.
+ const auto* space_repo = snapshot.bucket_space_repo();
+ assert(space_repo != nullptr);
+ _op = std::make_shared<GetOperation>(*this, space_repo->get(bucket.getBucketSpace()),
+ snapshot.steal_read_guard(), cmd, metrics);
return true;
}
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h
index 655feb5d00c..9db078af198 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.h
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h
@@ -8,6 +8,7 @@
#include <vespa/storage/distributor/distributorcomponent.h>
#include <vespa/storageapi/messageapi/messagehandler.h>
#include <chrono>
+#include <mutex>
namespace storage {
@@ -39,7 +40,8 @@ public:
DistributorBucketSpaceRepo& bucketSpaceRepo,
DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
const MaintenanceOperationGenerator&,
- DistributorComponentRegister& compReg);
+ DistributorComponentRegister& compReg,
+ bool enable_concurrent_gets);
~ExternalOperationHandler() override;
@@ -55,6 +57,9 @@ private:
OperationSequencer _mutationSequencer;
Operation::SP _op;
TimePoint _rejectFeedBeforeTimeReached;
+ mutable std::mutex _non_main_thread_ops_mutex;
+ OperationOwner _non_main_thread_ops_owner;
+ bool _enable_concurrent_gets;
template <typename Func>
void bounce_or_invoke_read_only_op(api::StorageCommand& cmd,
@@ -62,6 +67,8 @@ private:
PersistenceOperationMetricSet& metrics,
Func f);
+ void bounce_with_wrong_distribution(api::StorageCommand& cmd, const lib::ClusterState& cluster_state);
+ // Bounce with the current _default_ space cluster state.
void bounce_with_wrong_distribution(api::StorageCommand& cmd);
void bounce_with_busy_during_state_transition(api::StorageCommand& cmd,
const lib::ClusterState& current_state,
diff --git a/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp b/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp
index 8300f715cac..ec97e51b66d 100644
--- a/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp
+++ b/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp
@@ -3,25 +3,28 @@
namespace storage::distributor {
-OperationRoutingSnapshot::OperationRoutingSnapshot(std::shared_ptr<ClusterDistributionContext> context,
- std::shared_ptr<BucketDatabase::ReadGuard> read_guard)
+OperationRoutingSnapshot::OperationRoutingSnapshot(std::shared_ptr<const BucketSpaceDistributionContext> context,
+ std::shared_ptr<BucketDatabase::ReadGuard> read_guard,
+ const DistributorBucketSpaceRepo* bucket_space_repo)
: _context(std::move(context)),
- _read_guard(std::move(read_guard))
+ _read_guard(std::move(read_guard)),
+ _bucket_space_repo(bucket_space_repo)
{}
OperationRoutingSnapshot::~OperationRoutingSnapshot() = default;
OperationRoutingSnapshot OperationRoutingSnapshot::make_not_routable_in_state(
- std::shared_ptr<ClusterDistributionContext> context)
+ std::shared_ptr<const BucketSpaceDistributionContext> context)
{
- return OperationRoutingSnapshot(std::move(context), std::shared_ptr<BucketDatabase::ReadGuard>());
+ return OperationRoutingSnapshot(std::move(context), std::shared_ptr<BucketDatabase::ReadGuard>(), nullptr);
}
OperationRoutingSnapshot OperationRoutingSnapshot::make_routable_with_guard(
- std::shared_ptr<ClusterDistributionContext> context,
- std::shared_ptr<BucketDatabase::ReadGuard> read_guard)
+ std::shared_ptr<const BucketSpaceDistributionContext> context,
+ std::shared_ptr<BucketDatabase::ReadGuard> read_guard,
+ const DistributorBucketSpaceRepo& bucket_space_repo)
{
- return OperationRoutingSnapshot(std::move(context), std::move(read_guard));
+ return OperationRoutingSnapshot(std::move(context), std::move(read_guard), &bucket_space_repo);
}
}
diff --git a/storage/src/vespa/storage/distributor/operation_routing_snapshot.h b/storage/src/vespa/storage/distributor/operation_routing_snapshot.h
index 0bd8c5d928e..16ec8fef1c7 100644
--- a/storage/src/vespa/storage/distributor/operation_routing_snapshot.h
+++ b/storage/src/vespa/storage/distributor/operation_routing_snapshot.h
@@ -6,21 +6,37 @@
namespace storage::distributor {
-class ClusterDistributionContext;
+class BucketSpaceDistributionContext;
+class DistributorBucketSpaceRepo;
/**
- * TODO desc
+ * An "operation routing snapshot" is intended to provide a stable means of computing
+ * bucket routing targets and performing database lookups for a particular bucket space
+ * in a potentially multi-threaded setting. When using multiple threads, both the current
+ * cluster/distribution state as well as the underlying bucket database may change
+ * independent of each other when observed from any other thread than the main distributor
+ * thread. Additionally, the bucket management system may operate with separate read-only
+ * databases during state transitions, complicating things further.
+ *
+ * By using an OperationRoutingSnapshot, a caller gets a consistent view of the world
+ * that stays valid throughout the operation's life time.
+ *
+ * Note that holding the DB read guard should be done for as short a time as possible to
+ * avoid elevated memory usage caused by data stores not being able to free on-hold items.
*/
class OperationRoutingSnapshot {
- std::shared_ptr<ClusterDistributionContext> _context;
+ std::shared_ptr<const BucketSpaceDistributionContext> _context;
std::shared_ptr<BucketDatabase::ReadGuard> _read_guard;
+ const DistributorBucketSpaceRepo* _bucket_space_repo;
public:
- OperationRoutingSnapshot(std::shared_ptr<ClusterDistributionContext> context,
- std::shared_ptr<BucketDatabase::ReadGuard> read_guard);
+ OperationRoutingSnapshot(std::shared_ptr<const BucketSpaceDistributionContext> context,
+ std::shared_ptr<BucketDatabase::ReadGuard> read_guard,
+ const DistributorBucketSpaceRepo* bucket_space_repo);
- static OperationRoutingSnapshot make_not_routable_in_state(std::shared_ptr<ClusterDistributionContext> context);
- static OperationRoutingSnapshot make_routable_with_guard(std::shared_ptr<ClusterDistributionContext> context,
- std::shared_ptr<BucketDatabase::ReadGuard> read_guard);
+ static OperationRoutingSnapshot make_not_routable_in_state(std::shared_ptr<const BucketSpaceDistributionContext> context);
+ static OperationRoutingSnapshot make_routable_with_guard(std::shared_ptr<const BucketSpaceDistributionContext> context,
+ std::shared_ptr<BucketDatabase::ReadGuard> read_guard,
+ const DistributorBucketSpaceRepo& bucket_space_repo);
OperationRoutingSnapshot(const OperationRoutingSnapshot&) noexcept = default;
OperationRoutingSnapshot& operator=(const OperationRoutingSnapshot&) noexcept = default;
@@ -29,13 +45,16 @@ public:
~OperationRoutingSnapshot();
- const ClusterDistributionContext& context() const noexcept { return *_context; }
+ const BucketSpaceDistributionContext& context() const noexcept { return *_context; }
std::shared_ptr<BucketDatabase::ReadGuard> steal_read_guard() noexcept {
return std::move(_read_guard);
}
bool is_routable() const noexcept {
return (_read_guard.get() != nullptr);
}
+ const DistributorBucketSpaceRepo* bucket_space_repo() const noexcept {
+ return _bucket_space_repo;
+ }
};
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
index 768da935fd2..7ff2e298791 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
@@ -45,7 +45,7 @@ GetOperation::GroupId::operator==(const GroupId& other) const
}
GetOperation::GetOperation(DistributorComponent& manager,
- DistributorBucketSpace &bucketSpace,
+ const DistributorBucketSpace &bucketSpace,
std::shared_ptr<BucketDatabase::ReadGuard> read_guard,
std::shared_ptr<api::GetCommand> msg,
PersistenceOperationMetricSet& metric)
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
index 8d0fdb0c2bb..fe4dab5e9f2 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
@@ -24,7 +24,7 @@ class GetOperation : public Operation
{
public:
GetOperation(DistributorComponent& manager,
- DistributorBucketSpace &bucketSpace,
+ const DistributorBucketSpace &bucketSpace,
std::shared_ptr<BucketDatabase::ReadGuard> read_guard,
std::shared_ptr<api::GetCommand> msg,
PersistenceOperationMetricSet& metric);
@@ -77,7 +77,7 @@ private:
std::map<GroupId, GroupVector> _responses;
DistributorComponent& _manager;
- DistributorBucketSpace &_bucketSpace;
+ const DistributorBucketSpace &_bucketSpace;
std::shared_ptr<api::GetCommand> _msg;