summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-10-04 14:50:55 +0200
committerGitHub <noreply@github.com>2019-10-04 14:50:55 +0200
commit7bb9233afcf04a82bf8210c910450f0efc5f83f5 (patch)
tree6479c293aa4bc6ee737ff0a6983a632aed31e7a4
parent685704ab391abc6efdb9577c24bdf7ed48333f8e (diff)
parentb2c66836548ffa4aabc380195026fdbd844d40e7 (diff)
Merge pull request #10861 from vespa-engine/vekterli/add-distribution-state-snapshotting
Add distribution state snapshotting
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp147
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp3
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp1
-rw-r--r--storage/src/tests/distributor/getoperationtest.cpp5
-rw-r--r--storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt2
-rw-r--r--storage/src/vespa/storage/distributor/bucket_space_distribution_context.cpp81
-rw-r--r--storage/src/vespa/storage/distributor/bucket_space_distribution_context.h70
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp105
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h33
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_bucket_space.h6
-rw-r--r--storage/src/vespa/storage/distributor/distributorinterface.h3
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp50
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.h9
-rw-r--r--storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp30
-rw-r--r--storage/src/vespa/storage/distributor/operation_routing_snapshot.h60
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.cpp9
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.h13
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp3
21 files changed, 605 insertions, 44 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 321b0cc3bba..8409bd60986 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -2,11 +2,10 @@
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storage/distributor/bucketdbupdater.h>
+#include <vespa/storage/distributor/bucket_space_distribution_context.h>
#include <vespa/storage/distributor/distributormetricsset.h>
#include <vespa/storage/distributor/pending_bucket_space_db_transition.h>
#include <vespa/storage/distributor/outdated_nodes_map.h>
-#include <vespa/vespalib/io/fileutil.h>
-#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
#include <vespa/storage/storageutil/distributorstatecache.h>
#include <tests/distributor/distributortestutil.h>
#include <vespa/document/test/make_document_bucket.h>
@@ -124,7 +123,7 @@ public:
createLinks();
_bucketSpaces = getBucketSpaces();
// Disable deferred activation by default (at least for now) to avoid breaking the entire world.
- getConfig().setAllowStaleReadsDuringClusterStateTransitions(false);
+ getBucketDBUpdater().set_stale_reads_enabled(false);
};
void TearDown() override {
@@ -2415,7 +2414,7 @@ void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) {
}
TEST_F(BucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_ownership_change) {
- getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
+ getBucketDBUpdater().set_stale_reads_enabled(true);
lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
@@ -2468,7 +2467,7 @@ TEST_F(BucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_read_on
}
TEST_F(BucketDBUpdaterTest, non_owned_buckets_purged_when_read_only_support_is_config_disabled) {
- getConfig().setAllowStaleReadsDuringClusterStateTransitions(false);
+ getBucketDBUpdater().set_stale_reads_enabled(false);
lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
@@ -2497,7 +2496,6 @@ void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition(
uint32_t pending_buckets,
uint32_t pending_expected_msgs)
{
- getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
lib::ClusterState initial_state(initial_state_str);
setSystemState(initial_state);
ASSERT_EQ(messageCount(initial_expected_msgs), _sender.commands().size());
@@ -2514,6 +2512,7 @@ void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition(
}
TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until_activation_received) {
+ getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
@@ -2533,6 +2532,7 @@ TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until
}
TEST_F(BucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated) {
+ getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
@@ -2544,6 +2544,7 @@ TEST_F(BucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated
}
TEST_F(BucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_down) {
+ getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
@@ -2557,6 +2558,7 @@ TEST_F(BucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_d
}
TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_version_returns_actual_version) {
+ getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4,
@@ -2570,6 +2572,7 @@ TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_vers
}
TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_without_pending_transition_passes_message_through) {
+ getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
@@ -2727,4 +2730,136 @@ TEST_F(BucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_s
EXPECT_TRUE(state == nullptr);
}
+struct BucketDBUpdaterSnapshotTest : BucketDBUpdaterTest {
+ lib::ClusterState empty_state;
+ std::shared_ptr<lib::ClusterState> initial_baseline;
+ std::shared_ptr<lib::ClusterState> initial_default;
+ lib::ClusterStateBundle initial_bundle;
+ Bucket default_bucket;
+ Bucket global_bucket;
+
+ BucketDBUpdaterSnapshotTest()
+ : BucketDBUpdaterTest(),
+ empty_state(),
+ initial_baseline(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d")),
+ initial_default(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m")),
+ initial_bundle(*initial_baseline, {{FixedBucketSpaces::default_space(), initial_default},
+ {FixedBucketSpaces::global_space(), initial_baseline}}),
+ default_bucket(FixedBucketSpaces::default_space(), BucketId(16, 1234)),
+ global_bucket(FixedBucketSpaces::global_space(), BucketId(16, 1234))
+ {
+ }
+ ~BucketDBUpdaterSnapshotTest() override;
+
+ void SetUp() override {
+ BucketDBUpdaterTest::SetUp();
+ getBucketDBUpdater().set_stale_reads_enabled(true);
+ };
+
+ // Assumes that the distributor owns all buckets, so it may choose any arbitrary bucket in the bucket space
+ uint32_t buckets_in_snapshot_matching_current_db(DistributorBucketSpaceRepo& repo, BucketSpace bucket_space) {
+ auto rs = getBucketDBUpdater().read_snapshot_for_bucket(Bucket(bucket_space, BucketId(16, 1234)));
+ if (!rs.is_routable()) {
+ return 0;
+ }
+ auto guard = rs.steal_read_guard();
+ uint32_t found_buckets = 0;
+ for_each_bucket(repo, [&](const auto& space, const auto& entry) {
+ if (space == bucket_space) {
+ std::vector<BucketDatabase::Entry> entries;
+ guard->find_parents_and_self(entry.getBucketId(), entries);
+ if (entries.size() == 1) {
+ ++found_buckets;
+ }
+ }
+ });
+ return found_buckets;
+ }
+};
+
+BucketDBUpdaterSnapshotTest::~BucketDBUpdaterSnapshotTest() = default;
+
+TEST_F(BucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) {
+ auto rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_FALSE(rs.is_routable());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) {
+ auto rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket);
+ EXPECT_FALSE(rs.is_routable());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) {
+ set_cluster_state_bundle(initial_bundle);
+ // State currently pending, empty initial state is active
+
+ auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), empty_state.toString());
+ EXPECT_EQ(def_rs.context().default_active_cluster_state()->toString(), empty_state.toString());
+ ASSERT_TRUE(def_rs.context().has_pending_state_transition());
+ EXPECT_EQ(def_rs.context().pending_cluster_state()->toString(), initial_default->toString());
+
+ auto global_rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket);
+ EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), empty_state.toString());
+ EXPECT_EQ(global_rs.context().default_active_cluster_state()->toString(), empty_state.toString());
+ ASSERT_TRUE(global_rs.context().has_pending_state_transition());
+ EXPECT_EQ(global_rs.context().pending_cluster_state()->toString(), initial_baseline->toString());
+
+ ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(*initial_baseline, messageCount(1), 0));
+ // State now activated, no pending
+
+ def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), initial_default->toString());
+ EXPECT_EQ(def_rs.context().default_active_cluster_state()->toString(), initial_default->toString());
+ EXPECT_FALSE(def_rs.context().has_pending_state_transition());
+
+ global_rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket);
+ EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), initial_baseline->toString());
+ EXPECT_EQ(global_rs.context().default_active_cluster_state()->toString(), initial_default->toString());
+ EXPECT_FALSE(global_rs.context().has_pending_state_transition());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) {
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:1 storage:4", n_buckets, 4));
+ EXPECT_FALSE(activate_cluster_state_version(2));
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(mutable_repo(), FixedBucketSpaces::default_space()),
+ n_buckets);
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(mutable_repo(), FixedBucketSpaces::global_space()),
+ n_buckets);
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) {
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:2 .0.s:d storage:4", 0, 0));
+ EXPECT_FALSE(activate_cluster_state_version(2));
+ // We're down in state 2 and therefore do not own any buckets
+ auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_FALSE(def_rs.is_routable());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) {
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:2 .0.s:d storage:4", 0, 0));
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(read_only_repo(), FixedBucketSpaces::default_space()),
+ n_buckets);
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(read_only_repo(), FixedBucketSpaces::global_space()),
+ n_buckets);
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) {
+ getBucketDBUpdater().set_stale_reads_enabled(false);
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:2 .0.s:d storage:4", 0, 0));
+ auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_FALSE(def_rs.is_routable());
+}
+
}
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index 4a9ef147741..15820b64ff9 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -417,7 +417,8 @@ DistributorTestUtil::getBucketSpaces() const
void
DistributorTestUtil::enableDistributorClusterState(vespalib::stringref state)
{
- _distributor->enableClusterStateBundle(lib::ClusterStateBundle(lib::ClusterState(state)));
+ getBucketDBUpdater().simulate_cluster_state_bundle_activation(
+ lib::ClusterStateBundle(lib::ClusterState(state)));
}
}
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index 600e56faf31..84f7d34d069 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -505,6 +505,7 @@ document::BucketId ExternalOperationHandlerTest::set_up_pending_cluster_state_tr
std::string current = "version:123 distributor:2 storage:2";
std::string pending = "version:321 distributor:3 storage:3";
setupDistributor(1, 3, current);
+ getBucketDBUpdater().set_stale_reads_enabled(read_only_enabled);
getConfig().setAllowStaleReadsDuringClusterStateTransitions(read_only_enabled);
// Trigger pending cluster state
diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp
index 7c308e152db..99d7c12551d 100644
--- a/storage/src/tests/distributor/getoperationtest.cpp
+++ b/storage/src/tests/distributor/getoperationtest.cpp
@@ -3,6 +3,8 @@
#include <vespa/config/helper/configgetter.h>
#include <vespa/document/config/config-documenttypes.h>
#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/storage/bucketdb/bucketdatabase.h>
+#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/externaloperationhandler.h>
#include <vespa/storage/distributor/distributor.h>
#include <vespa/storage/distributor/distributormetricsset.h>
@@ -31,7 +33,7 @@ struct GetOperationTest : Test, DistributorTestUtil {
std::unique_ptr<Operation> op;
GetOperationTest();
- ~GetOperationTest();
+ ~GetOperationTest() override;
void SetUp() override {
_repo.reset(
@@ -53,6 +55,7 @@ struct GetOperationTest : Test, DistributorTestUtil {
auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(document::BucketId(0)), docId, "[all]");
op = std::make_unique<GetOperation>(
getExternalOperationHandler(), getDistributorBucketSpace(),
+ getDistributorBucketSpace().getBucketDatabase().acquire_read_guard(),
msg, getDistributor().getMetrics(). gets[msg->getLoadType()]);
op->start(_sender, framework::MilliSecTime(0));
}
diff --git a/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp b/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp
index c3ade3c2877..66d44a655e0 100644
--- a/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp
+++ b/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp
@@ -148,7 +148,9 @@ Entry BTreeBucketDatabase::entry_from_iterator(const BTree::ConstIterator& iter)
if (!iter.valid()) {
return Entry::createInvalid();
}
- return entry_from_value(iter.getKey(), iter.getData());
+ const auto value = iter.getData();
+ std::atomic_thread_fence(std::memory_order_acquire);
+ return entry_from_value(iter.getKey(), value);
}
ConstEntryRef BTreeBucketDatabase::const_entry_ref_from_iterator(const BTree::ConstIterator& iter) const {
@@ -156,6 +158,7 @@ ConstEntryRef BTreeBucketDatabase::const_entry_ref_from_iterator(const BTree::Co
return ConstEntryRef::createInvalid();
}
const auto value = iter.getData();
+ std::atomic_thread_fence(std::memory_order_acquire);
const auto replicas_ref = _store.get(entry_ref_from_value(value));
const auto bucket = BucketId(BucketId::keyToBucketId(iter.getKey()));
return const_entry_ref_from_replica_array_ref(bucket, gc_timestamp_from_value(value), replicas_ref);
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index 8c701033e67..944df6e1708 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -7,6 +7,7 @@ vespa_add_library(storage_distributor
bucket_db_prune_elision.cpp
bucketgctimecalculator.cpp
bucketlistmerger.cpp
+ bucket_space_distribution_context.cpp
clusterinformation.cpp
distributor_bucket_space.cpp
distributor_bucket_space_repo.cpp
@@ -20,6 +21,7 @@ vespa_add_library(storage_distributor
idealstatemetricsset.cpp
messagetracker.cpp
nodeinfo.cpp
+ operation_routing_snapshot.cpp
operation_sequencer.cpp
operationowner.cpp
operationtargetresolver.cpp
diff --git a/storage/src/vespa/storage/distributor/bucket_space_distribution_context.cpp b/storage/src/vespa/storage/distributor/bucket_space_distribution_context.cpp
new file mode 100644
index 00000000000..53040bc42b1
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/bucket_space_distribution_context.cpp
@@ -0,0 +1,81 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "bucket_space_distribution_context.h"
+
+namespace storage::distributor {
+
+BucketSpaceDistributionContext::~BucketSpaceDistributionContext() = default;
+
+BucketSpaceDistributionContext::BucketSpaceDistributionContext(
+ std::shared_ptr<const lib::ClusterState> active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> default_active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> pending_cluster_state,
+ std::shared_ptr<const lib::Distribution> distribution,
+ uint16_t this_node_index)
+ : _active_cluster_state(std::move(active_cluster_state)),
+ _default_active_cluster_state(std::move(default_active_cluster_state)),
+ _pending_cluster_state(std::move(pending_cluster_state)),
+ _distribution(std::move(distribution)),
+ _this_node_index(this_node_index)
+{}
+
+std::shared_ptr<BucketSpaceDistributionContext> BucketSpaceDistributionContext::make_state_transition(
+ std::shared_ptr<const lib::ClusterState> active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> default_active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> pending_cluster_state,
+ std::shared_ptr<const lib::Distribution> distribution,
+ uint16_t this_node_index)
+{
+ return std::make_shared<BucketSpaceDistributionContext>(
+ std::move(active_cluster_state), std::move(default_active_cluster_state),
+ std::move(pending_cluster_state), std::move(distribution),
+ this_node_index);
+}
+
+std::shared_ptr<BucketSpaceDistributionContext> BucketSpaceDistributionContext::make_stable_state(
+ std::shared_ptr<const lib::ClusterState> active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> default_active_cluster_state,
+ std::shared_ptr<const lib::Distribution> distribution,
+ uint16_t this_node_index)
+{
+ return std::make_shared<BucketSpaceDistributionContext>(
+ std::move(active_cluster_state), std::move(default_active_cluster_state),
+ std::shared_ptr<const lib::ClusterState>(),
+ std::move(distribution), this_node_index);
+}
+
+std::shared_ptr<BucketSpaceDistributionContext>
+BucketSpaceDistributionContext::make_not_yet_initialized(uint16_t this_node_index)
+{
+ return std::make_shared<BucketSpaceDistributionContext>(
+ std::make_shared<const lib::ClusterState>(),
+ std::make_shared<const lib::ClusterState>(),
+ std::shared_ptr<const lib::ClusterState>(),
+ std::make_shared<const lib::Distribution>(),
+ this_node_index);
+}
+
+bool BucketSpaceDistributionContext::bucket_owned_in_state(const lib::ClusterState& state,
+ const document::BucketId& id) const
+{
+ try {
+ uint16_t owner_idx = _distribution->getIdealDistributorNode(state, id);
+ return (owner_idx == _this_node_index);
+ } catch (lib::TooFewBucketBitsInUseException&) {
+ return false;
+ } catch (lib::NoDistributorsAvailableException&) {
+ return false;
+ }
+}
+
+bool BucketSpaceDistributionContext::bucket_owned_in_active_state(const document::BucketId& id) const {
+ return bucket_owned_in_state(*_active_cluster_state, id);
+}
+
+bool BucketSpaceDistributionContext::bucket_owned_in_pending_state(const document::BucketId& id) const {
+ if (_pending_cluster_state) {
+ return bucket_owned_in_state(*_pending_cluster_state, id);
+ }
+ return true; // No pending state, owned by default.
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/bucket_space_distribution_context.h b/storage/src/vespa/storage/distributor/bucket_space_distribution_context.h
new file mode 100644
index 00000000000..7a9c0fcae60
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/bucket_space_distribution_context.h
@@ -0,0 +1,70 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <memory>
+#include <cstdint>
+
+namespace storage::distributor {
+
+/**
+ * Represents a consistent snapshot of cluster state and distribution config
+ * information at a particular point in time. This is sufficient to compute
+ * bucket ownership and distributions for the bucket space associated with
+ * the context.
+ *
+ * Since this is a snapshot in time, the context is immutable once created.
+ */
+class BucketSpaceDistributionContext {
+ std::shared_ptr<const lib::ClusterState> _active_cluster_state;
+ std::shared_ptr<const lib::ClusterState> _default_active_cluster_state;
+ std::shared_ptr<const lib::ClusterState> _pending_cluster_state; // May be null if no state is pending
+ std::shared_ptr<const lib::Distribution> _distribution; // TODO ideally should have a pending distribution as well
+ uint16_t _this_node_index;
+public:
+ BucketSpaceDistributionContext() = delete;
+ // Public due to make_shared, prefer factory functions to instantiate instead.
+ BucketSpaceDistributionContext(std::shared_ptr<const lib::ClusterState> active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> default_active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> pending_cluster_state,
+ std::shared_ptr<const lib::Distribution> distribution,
+ uint16_t this_node_index);
+ ~BucketSpaceDistributionContext();
+
+ static std::shared_ptr<BucketSpaceDistributionContext> make_state_transition(
+ std::shared_ptr<const lib::ClusterState> active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> default_active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> pending_cluster_state,
+ std::shared_ptr<const lib::Distribution> distribution,
+ uint16_t this_node_index);
+ static std::shared_ptr<BucketSpaceDistributionContext> make_stable_state(
+ std::shared_ptr<const lib::ClusterState> active_cluster_state,
+ std::shared_ptr<const lib::ClusterState> default_active_cluster_state,
+ std::shared_ptr<const lib::Distribution> distribution,
+ uint16_t this_node_index);
+ static std::shared_ptr<BucketSpaceDistributionContext> make_not_yet_initialized(uint16_t this_node_index);
+
+ const std::shared_ptr<const lib::ClusterState>& active_cluster_state() const noexcept {
+ return _active_cluster_state;
+ }
+
+ const std::shared_ptr<const lib::ClusterState>& default_active_cluster_state() const noexcept {
+ return _default_active_cluster_state;
+ }
+ bool has_pending_state_transition() const noexcept {
+ return (_pending_cluster_state.get() != nullptr);
+ }
+ // Returned shared_ptr is nullptr iff has_pending_state_transition() == false.
+ const std::shared_ptr<const lib::ClusterState>& pending_cluster_state() const noexcept {
+ return _pending_cluster_state;
+ }
+
+ bool bucket_owned_in_state(const lib::ClusterState& state, const document::BucketId& id) const;
+ bool bucket_owned_in_active_state(const document::BucketId& id) const;
+ bool bucket_owned_in_pending_state(const document::BucketId& id) const;
+
+ uint16_t this_node_index() const noexcept { return _this_node_index; }
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index a901ac28a54..227165a0911 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -2,6 +2,7 @@
#include "bucketdbupdater.h"
#include "bucket_db_prune_elision.h"
+#include "bucket_space_distribution_context.h"
#include "distributor.h"
#include "distributor_bucket_space.h"
#include "distributormetricsset.h"
@@ -30,12 +31,44 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner,
: framework::StatusReporter("bucketdb", "Bucket DB Updater"),
_distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"),
_sender(sender),
- _transitionTimer(_distributorComponent.getClock())
+ _transitionTimer(_distributorComponent.getClock()),
+ _active_distribution_contexts(),
+ _distribution_context_mutex()
{
+ for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
+ _active_distribution_contexts.emplace(
+ elem.first,
+ BucketSpaceDistributionContext::make_not_yet_initialized(_distributorComponent.getIndex()));
+ _explicit_transition_read_guard.emplace(elem.first, std::shared_ptr<BucketDatabase::ReadGuard>());
+ }
}
BucketDBUpdater::~BucketDBUpdater() = default;
+OperationRoutingSnapshot BucketDBUpdater::read_snapshot_for_bucket(const document::Bucket& bucket) const {
+ const auto bucket_space = bucket.getBucketSpace();
+ std::lock_guard lock(_distribution_context_mutex);
+ auto active_state_iter = _active_distribution_contexts.find(bucket_space);
+ assert(active_state_iter != _active_distribution_contexts.cend());
+ auto& state = *active_state_iter->second;
+ if (!state.bucket_owned_in_active_state(bucket.getBucketId())) {
+ return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second);
+ }
+ const bool bucket_present_in_mutable_db = state.bucket_owned_in_pending_state(bucket.getBucketId());
+ if (!bucket_present_in_mutable_db && !stale_reads_enabled()) {
+ return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second);
+ }
+ const auto& space_repo = bucket_present_in_mutable_db
+ ? _distributorComponent.getBucketSpaceRepo()
+ : _distributorComponent.getReadOnlyBucketSpaceRepo();
+ auto existing_guard_iter = _explicit_transition_read_guard.find(bucket_space);
+ assert(existing_guard_iter != _explicit_transition_read_guard.cend());
+ auto db_guard = existing_guard_iter->second
+ ? existing_guard_iter-> second
+ : space_repo.get(bucket_space).getBucketDatabase().acquire_read_guard();
+ return OperationRoutingSnapshot::make_routable_with_guard(active_state_iter->second, std::move(db_guard), space_repo);
+}
+
void
BucketDBUpdater::flush()
{
@@ -59,8 +92,7 @@ BucketDBUpdater::print(std::ostream& out, bool verbose, const std::string& inden
bool
BucketDBUpdater::shouldDeferStateEnabling() const noexcept
{
- return _distributorComponent.getDistributor().getConfig()
- .allowStaleReadsDuringClusterStateTransitions();
+ return stale_reads_enabled();
}
bool
@@ -258,6 +290,61 @@ BucketDBUpdater::replyToActivationWithActualVersion(
_distributorComponent.sendUp(reply); // TODO let API accept rvalues
}
+void BucketDBUpdater::update_read_snapshot_before_db_pruning() {
+ std::lock_guard lock(_distribution_context_mutex);
+ for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
+ // At this point, we're still operating with a distribution context _without_ a
+ // pending state, i.e. anyone using the context will expect to find buckets
+ // in the DB that correspond to how the database looked like prior to pruning
+ // buckets from the DB. To ensure this is not violated, take a snapshot of the
+ // _mutable_ DB and expose this. This snapshot only lives until we atomically
+ // flip to expose a distribution context that includes the new, pending state.
+ // At that point, the read-only DB is known to contain the buckets that have
+ // been pruned away, so we can release the mutable DB snapshot safely.
+ // TODO test for, and handle, state preemption case!
+ _explicit_transition_read_guard[elem.first] = elem.second->getBucketDatabase().acquire_read_guard();
+ }
+}
+
+
+void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) {
+ std::lock_guard lock(_distribution_context_mutex);
+ const auto old_default_state = _distributorComponent.getBucketSpaceRepo().get(
+ document::FixedBucketSpaces::default_space()).cluster_state_sp();
+ for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
+ auto new_distribution = elem.second->distribution_sp();
+ auto old_cluster_state = elem.second->cluster_state_sp();
+ auto new_cluster_state = new_state.getDerivedClusterState(elem.first);
+ _active_distribution_contexts.insert_or_assign(
+ elem.first,
+ BucketSpaceDistributionContext::make_state_transition(
+ std::move(old_cluster_state),
+ old_default_state,
+ std::move(new_cluster_state),
+ std::move(new_distribution),
+ _distributorComponent.getIndex()));
+ // We can now remove the explicit mutable DB snapshot, as the buckets that have been
+ // pruned away are visible in the read-only DB.
+ _explicit_transition_read_guard[elem.first] = std::shared_ptr<BucketDatabase::ReadGuard>();
+ }
+}
+
+void BucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) {
+ std::lock_guard lock(_distribution_context_mutex);
+ const auto& default_cluster_state = activated_state.getDerivedClusterState(document::FixedBucketSpaces::default_space());
+ for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
+ auto new_distribution = elem.second->distribution_sp();
+ auto new_cluster_state = activated_state.getDerivedClusterState(elem.first);
+ _active_distribution_contexts.insert_or_assign(
+ elem.first,
+ BucketSpaceDistributionContext::make_stable_state(
+ std::move(new_cluster_state),
+ default_cluster_state,
+ std::move(new_distribution),
+ _distributorComponent.getIndex()));
+ }
+}
+
bool
BucketDBUpdater::onSetSystemState(
const std::shared_ptr<api::SetSystemStateCommand>& cmd)
@@ -275,8 +362,10 @@ BucketDBUpdater::onSetSystemState(
ensureTransitionTimerStarted();
// Separate timer since _transitionTimer might span multiple pending states.
framework::MilliSecTimer process_timer(_distributorComponent.getClock());
-
- removeSuperfluousBuckets(cmd->getClusterStateBundle(), false);
+ update_read_snapshot_before_db_pruning();
+ const auto& bundle = cmd->getClusterStateBundle();
+ removeSuperfluousBuckets(bundle, false);
+ update_read_snapshot_after_db_pruning(bundle);
replyToPreviousPendingClusterStateIfAny();
ClusterInformation::CSP clusterInfo(
@@ -642,6 +731,7 @@ BucketDBUpdater::activatePendingClusterState()
_distributorComponent.getDistributor().notifyDistributionChangeEnabled();
}
+ update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle());
_pendingClusterState.reset();
_outdatedNodesMap.clear();
sendAllQueuedBucketRechecks();
@@ -665,6 +755,11 @@ BucketDBUpdater::enableCurrentClusterStateBundleInDistributor()
_distributorComponent.getDistributor().enableClusterStateBundle(state);
}
+void BucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state) {
+ update_read_snapshot_after_activation(activated_state);
+ _distributorComponent.getDistributor().enableClusterStateBundle(activated_state);
+}
+
void
BucketDBUpdater::addCurrentStateToClusterStateHistory()
{
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index e69d328d8bc..86ceab14486 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -6,6 +6,7 @@
#include "distributorcomponent.h"
#include "distributormessagesender.h"
#include "pendingclusterstate.h"
+#include "operation_routing_snapshot.h"
#include "outdated_nodes_map.h"
#include <vespa/document/bucket/bucket.h>
#include <vespa/storageapi/messageapi/returncode.h>
@@ -15,7 +16,9 @@
#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/storageframework/generic/status/statusreporter.h>
#include <vespa/storageapi/messageapi/messagehandler.h>
+#include <atomic>
#include <list>
+#include <mutex>
namespace vespalib::xml {
class XmlOutputStream;
@@ -25,6 +28,7 @@ class XmlAttribute;
namespace storage::distributor {
class Distributor;
+class BucketSpaceDistributionContext;
class BucketDBUpdater : public framework::StatusReporter,
public api::MessageHandler
@@ -70,7 +74,14 @@ public:
return ((_pendingClusterState.get() != nullptr)
&& _pendingClusterState->hasBucketOwnershipTransfer());
}
+ void set_stale_reads_enabled(bool enabled) noexcept {
+ _stale_reads_enabled.store(enabled, std::memory_order_relaxed);
+ }
+ bool stale_reads_enabled() const noexcept {
+ return _stale_reads_enabled.load(std::memory_order_relaxed);
+ }
+ OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const;
private:
DistributorComponent _distributorComponent;
class MergeReplyGuard {
@@ -129,6 +140,12 @@ private:
}
};
+ friend class DistributorTestUtil;
+ // Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor
+ // components agree on the currently active cluster state bundle.
+ // Transitively invokes Distributor::enableClusterStateBundle
+ void simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state);
+
bool shouldDeferStateEnabling() const noexcept;
bool hasPendingClusterState() const;
bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
@@ -166,8 +183,11 @@ private:
void updateState(const lib::ClusterState& oldState, const lib::ClusterState& newState);
+ void update_read_snapshot_before_db_pruning();
void removeSuperfluousBuckets(const lib::ClusterStateBundle& newState,
bool is_distribution_config_change);
+ void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state);
+ void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state);
void replyToPreviousPendingClusterStateIfAny();
void replyToActivationWithActualVersion(
@@ -182,9 +202,6 @@ private:
void maybe_inject_simulated_db_pruning_delay();
void maybe_inject_simulated_db_merging_delay();
- friend class BucketDBUpdater_Test;
- friend class MergeOperation_Test;
-
/**
Removes all copies of buckets that are on nodes that are down.
*/
@@ -235,6 +252,16 @@ private:
std::set<EnqueuedBucketRecheck> _enqueuedRechecks;
OutdatedNodesMap _outdatedNodesMap;
framework::MilliSecTimer _transitionTimer;
+ std::atomic<bool> _stale_reads_enabled;
+ using DistributionContexts = std::unordered_map<document::BucketSpace,
+ std::shared_ptr<BucketSpaceDistributionContext>,
+ document::BucketSpace::hash>;
+ DistributionContexts _active_distribution_contexts;
+ using DbGuards = std::unordered_map<document::BucketSpace,
+ std::shared_ptr<BucketDatabase::ReadGuard>,
+ document::BucketSpace::hash>;
+ DbGuards _explicit_transition_read_guard;
+ mutable std::mutex _distribution_context_mutex;
};
}
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 69b64ac8dc1..4adbdd32669 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -77,7 +77,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_distributorStatusDelegate(compReg, *this, *this),
_bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater),
_idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, manageActiveBucketCopies),
- _externalOperationHandler(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, _idealStateManager, compReg),
+ _externalOperationHandler(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo,
+ _idealStateManager, compReg, use_btree_database),
_threadPool(threadPool),
_initializingIsUp(true),
_doneInitializeHandler(doneInitHandler),
@@ -322,9 +323,7 @@ bool
Distributor::handleMessage(const std::shared_ptr<api::StorageMessage>& msg)
{
if (msg->getType().isReply()) {
- std::shared_ptr<api::StorageReply> reply =
- std::dynamic_pointer_cast<api::StorageReply>(msg);
-
+ auto reply = std::dynamic_pointer_cast<api::StorageReply>(msg);
if (handleReply(reply)) {
return true;
}
@@ -400,6 +399,10 @@ Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state)
}
}
+OperationRoutingSnapshot Distributor::read_snapshot_for_bucket(const document::Bucket& bucket) const {
+ return _bucketDBUpdater.read_snapshot_for_bucket(bucket);
+}
+
void
Distributor::notifyDistributionChangeEnabled()
{
@@ -834,6 +837,7 @@ Distributor::enableNextConfig()
_bucketDBMetricUpdater.setMinimumReplicaCountingMode(getConfig().getMinimumReplicaCountingMode());
_ownershipSafeTimeCalc->setMaxClusterClockSkew(getConfig().getMaxClusterClockSkew());
_pendingMessageTracker.setNodeBusyDuration(getConfig().getInhibitMergesOnBusyNodeDuration());
+ _bucketDBUpdater.set_stale_reads_enabled(getConfig().allowStaleReadsDuringClusterStateTransitions());
}
void
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 638704adf24..48d9145eec7 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -170,6 +170,8 @@ public:
return *_readOnlyBucketSpaceRepo;
}
+ OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const override;
+
class Status;
class MetricUpdateHook : public framework::MetricUpdateHook
{
diff --git a/storage/src/vespa/storage/distributor/distributor_bucket_space.h b/storage/src/vespa/storage/distributor/distributor_bucket_space.h
index 26a0ee9098c..8fbb99dfe89 100644
--- a/storage/src/vespa/storage/distributor/distributor_bucket_space.h
+++ b/storage/src/vespa/storage/distributor/distributor_bucket_space.h
@@ -48,6 +48,9 @@ public:
void setClusterState(std::shared_ptr<const lib::ClusterState> clusterState);
const lib::ClusterState &getClusterState() const noexcept { return *_clusterState; }
+ const std::shared_ptr<const lib::ClusterState>& cluster_state_sp() const noexcept {
+ return _clusterState;
+ }
void setDistribution(std::shared_ptr<const lib::Distribution> distribution);
@@ -55,6 +58,9 @@ public:
const lib::Distribution& getDistribution() const noexcept {
return *_distribution;
}
+ const std::shared_ptr<const lib::Distribution>& distribution_sp() const noexcept {
+ return _distribution;
+ }
};
diff --git a/storage/src/vespa/storage/distributor/distributorinterface.h b/storage/src/vespa/storage/distributor/distributorinterface.h
index d9f037bb8f1..aba58e112dc 100644
--- a/storage/src/vespa/storage/distributor/distributorinterface.h
+++ b/storage/src/vespa/storage/distributor/distributorinterface.h
@@ -4,6 +4,7 @@
#include "bucketgctimecalculator.h"
#include "distributormessagesender.h"
#include "bucketownership.h"
+#include "operation_routing_snapshot.h"
#include <vespa/storage/bucketdb/bucketdatabase.h>
#include <vespa/document/bucket/bucket.h>
@@ -49,6 +50,8 @@ public:
*/
virtual const lib::ClusterStateBundle& getClusterStateBundle() const = 0;
+ virtual OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const = 0;
+
/**
* Returns true if the node is currently initializing.
*/
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index 1b88f02cac6..221c516a56e 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -1,5 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "bucket_space_distribution_context.h"
#include "externaloperationhandler.h"
#include "distributor.h"
#include <vespa/document/base/documentid.h>
@@ -12,7 +13,6 @@
#include <vespa/storage/distributor/operations/external/statbucketlistoperation.h>
#include <vespa/storage/distributor/operations/external/removelocationoperation.h>
#include <vespa/storage/distributor/operations/external/visitoroperation.h>
-#include <vespa/document/util/stringutil.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/storageapi/message/stat.h>
@@ -30,11 +30,16 @@ ExternalOperationHandler::ExternalOperationHandler(Distributor& owner,
DistributorBucketSpaceRepo& bucketSpaceRepo,
DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
const MaintenanceOperationGenerator& gen,
- DistributorComponentRegister& compReg)
+ DistributorComponentRegister& compReg,
+ bool enable_concurrent_gets)
: DistributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "External operation handler"),
_operationGenerator(gen),
- _rejectFeedBeforeTimeReached() // At epoch
-{ }
+ _rejectFeedBeforeTimeReached(), // At epoch
+ _non_main_thread_ops_mutex(),
+ _non_main_thread_ops_owner(owner, getClock()),
+ _enable_concurrent_gets(enable_concurrent_gets)
+{
+}
ExternalOperationHandler::~ExternalOperationHandler() = default;
@@ -78,24 +83,32 @@ void ExternalOperationHandler::bounce_with_result(api::StorageCommand& cmd, cons
sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
}
-void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd) {
+void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd,
+ const lib::ClusterState& cluster_state)
+{
// Distributor ownership is equal across bucket spaces, so always send back default space state.
// This also helps client avoid getting confused by possibly observing different actual
// (derived) state strings for global/non-global document types for the same state version.
// Similarly, if we've yet to activate any version at all we send back BUSY instead
// of a suspiciously empty WrongDistributionReply.
// TOOD consider NOT_READY instead of BUSY once we're sure this won't cause any other issues.
- const auto& cluster_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState();
if (cluster_state.getVersion() != 0) {
auto cluster_state_str = cluster_state.toString();
- LOG(debug, "Got message with wrong distribution, sending back state '%s'", cluster_state_str.c_str());
+ LOG(debug, "Got %s with wrong distribution, sending back state '%s'",
+ cmd.toString().c_str(), cluster_state_str.c_str());
bounce_with_result(cmd, api::ReturnCode(api::ReturnCode::WRONG_DISTRIBUTION, cluster_state_str));
} else { // Only valid for empty startup state
- LOG(debug, "Got message with wrong distribution, but no cluster state activated yet. Sending back BUSY");
+ LOG(debug, "Got %s with wrong distribution, but no cluster state activated yet. Sending back BUSY",
+ cmd.toString().c_str());
bounce_with_result(cmd, api::ReturnCode(api::ReturnCode::BUSY, "No cluster state activated yet"));
}
}
+void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd) {
+ const auto& cluster_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState();
+ bounce_with_wrong_distribution(cmd, cluster_state);
+}
+
void ExternalOperationHandler::bounce_with_busy_during_state_transition(
api::StorageCommand& cmd,
const lib::ClusterState& current_state,
@@ -283,10 +296,23 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get)
{
document::Bucket bucket(cmd->getBucket().getBucketSpace(), getBucketId(cmd->getDocumentId()));
auto& metrics = getMetrics().gets[cmd->getLoadType()];
- bounce_or_invoke_read_only_op(*cmd, bucket, metrics, [&](auto& bucket_space_repo) {
- _op = std::make_shared<GetOperation>(*this, bucket_space_repo.get(cmd->getBucket().getBucketSpace()),
- cmd, metrics);
- });
+ auto snapshot = getDistributor().read_snapshot_for_bucket(bucket);
+ if (!snapshot.is_routable()) {
+ const auto& ctx = snapshot.context();
+ if (ctx.has_pending_state_transition()) {
+ bounce_with_busy_during_state_transition(*cmd, *ctx.default_active_cluster_state(),
+ *ctx.pending_cluster_state());
+ } else {
+ bounce_with_wrong_distribution(*cmd, *snapshot.context().default_active_cluster_state());
+ metrics.failures.wrongdistributor.inc(); // TODO thread safety for updates
+ }
+ return true;
+ }
+ // The snapshot is aware of whether stale reads are enabled, so we don't have to check that here.
+ const auto* space_repo = snapshot.bucket_space_repo();
+ assert(space_repo != nullptr);
+ _op = std::make_shared<GetOperation>(*this, space_repo->get(bucket.getBucketSpace()),
+ snapshot.steal_read_guard(), cmd, metrics);
return true;
}
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h
index 655feb5d00c..9db078af198 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.h
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h
@@ -8,6 +8,7 @@
#include <vespa/storage/distributor/distributorcomponent.h>
#include <vespa/storageapi/messageapi/messagehandler.h>
#include <chrono>
+#include <mutex>
namespace storage {
@@ -39,7 +40,8 @@ public:
DistributorBucketSpaceRepo& bucketSpaceRepo,
DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
const MaintenanceOperationGenerator&,
- DistributorComponentRegister& compReg);
+ DistributorComponentRegister& compReg,
+ bool enable_concurrent_gets);
~ExternalOperationHandler() override;
@@ -55,6 +57,9 @@ private:
OperationSequencer _mutationSequencer;
Operation::SP _op;
TimePoint _rejectFeedBeforeTimeReached;
+ mutable std::mutex _non_main_thread_ops_mutex;
+ OperationOwner _non_main_thread_ops_owner;
+ bool _enable_concurrent_gets;
template <typename Func>
void bounce_or_invoke_read_only_op(api::StorageCommand& cmd,
@@ -62,6 +67,8 @@ private:
PersistenceOperationMetricSet& metrics,
Func f);
+ void bounce_with_wrong_distribution(api::StorageCommand& cmd, const lib::ClusterState& cluster_state);
+ // Bounce with the current _default_ space cluster state.
void bounce_with_wrong_distribution(api::StorageCommand& cmd);
void bounce_with_busy_during_state_transition(api::StorageCommand& cmd,
const lib::ClusterState& current_state,
diff --git a/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp b/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp
new file mode 100644
index 00000000000..ec97e51b66d
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp
@@ -0,0 +1,30 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "operation_routing_snapshot.h"
+
+namespace storage::distributor {
+
+OperationRoutingSnapshot::OperationRoutingSnapshot(std::shared_ptr<const BucketSpaceDistributionContext> context,
+ std::shared_ptr<BucketDatabase::ReadGuard> read_guard,
+ const DistributorBucketSpaceRepo* bucket_space_repo)
+ : _context(std::move(context)),
+ _read_guard(std::move(read_guard)),
+ _bucket_space_repo(bucket_space_repo)
+{}
+
+OperationRoutingSnapshot::~OperationRoutingSnapshot() = default;
+
+OperationRoutingSnapshot OperationRoutingSnapshot::make_not_routable_in_state(
+ std::shared_ptr<const BucketSpaceDistributionContext> context)
+{
+ return OperationRoutingSnapshot(std::move(context), std::shared_ptr<BucketDatabase::ReadGuard>(), nullptr);
+}
+
+OperationRoutingSnapshot OperationRoutingSnapshot::make_routable_with_guard(
+ std::shared_ptr<const BucketSpaceDistributionContext> context,
+ std::shared_ptr<BucketDatabase::ReadGuard> read_guard,
+ const DistributorBucketSpaceRepo& bucket_space_repo)
+{
+ return OperationRoutingSnapshot(std::move(context), std::move(read_guard), &bucket_space_repo);
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/operation_routing_snapshot.h b/storage/src/vespa/storage/distributor/operation_routing_snapshot.h
new file mode 100644
index 00000000000..16ec8fef1c7
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operation_routing_snapshot.h
@@ -0,0 +1,60 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/storage/bucketdb/bucketdatabase.h>
+#include <memory>
+
+namespace storage::distributor {
+
+class BucketSpaceDistributionContext;
+class DistributorBucketSpaceRepo;
+
+/**
+ * An "operation routing snapshot" is intended to provide a stable means of computing
+ * bucket routing targets and performing database lookups for a particular bucket space
+ * in a potentially multi-threaded setting. When using multiple threads, both the current
+ * cluster/distribution state as well as the underlying bucket database may change
+ * independent of each other when observed from any other thread than the main distributor
+ * thread. Additionally, the bucket management system may operate with separate read-only
+ * databases during state transitions, complicating things further.
+ *
+ * By using an OperationRoutingSnapshot, a caller gets a consistent view of the world
+ * that stays valid throughout the operation's life time.
+ *
+ * Note that holding the DB read guard should be done for as short a time as possible to
+ * avoid elevated memory usage caused by data stores not being able to free on-hold items.
+ */
+class OperationRoutingSnapshot {
+ std::shared_ptr<const BucketSpaceDistributionContext> _context;
+ std::shared_ptr<BucketDatabase::ReadGuard> _read_guard;
+ const DistributorBucketSpaceRepo* _bucket_space_repo;
+public:
+ OperationRoutingSnapshot(std::shared_ptr<const BucketSpaceDistributionContext> context,
+ std::shared_ptr<BucketDatabase::ReadGuard> read_guard,
+ const DistributorBucketSpaceRepo* bucket_space_repo);
+
+ static OperationRoutingSnapshot make_not_routable_in_state(std::shared_ptr<const BucketSpaceDistributionContext> context);
+ static OperationRoutingSnapshot make_routable_with_guard(std::shared_ptr<const BucketSpaceDistributionContext> context,
+ std::shared_ptr<BucketDatabase::ReadGuard> read_guard,
+ const DistributorBucketSpaceRepo& bucket_space_repo);
+
+ OperationRoutingSnapshot(const OperationRoutingSnapshot&) noexcept = default;
+ OperationRoutingSnapshot& operator=(const OperationRoutingSnapshot&) noexcept = default;
+ OperationRoutingSnapshot(OperationRoutingSnapshot&&) noexcept = default;
+ OperationRoutingSnapshot& operator=(OperationRoutingSnapshot&&) noexcept = default;
+
+ ~OperationRoutingSnapshot();
+
+ const BucketSpaceDistributionContext& context() const noexcept { return *_context; }
+ std::shared_ptr<BucketDatabase::ReadGuard> steal_read_guard() noexcept {
+ return std::move(_read_guard);
+ }
+ bool is_routable() const noexcept {
+ return (_read_guard.get() != nullptr);
+ }
+ const DistributorBucketSpaceRepo* bucket_space_repo() const noexcept {
+ return _bucket_space_repo;
+ }
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
index 6cfc688db0e..7ff2e298791 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
@@ -45,7 +45,8 @@ GetOperation::GroupId::operator==(const GroupId& other) const
}
GetOperation::GetOperation(DistributorComponent& manager,
- DistributorBucketSpace &bucketSpace,
+ const DistributorBucketSpace &bucketSpace,
+ std::shared_ptr<BucketDatabase::ReadGuard> read_guard,
std::shared_ptr<api::GetCommand> msg,
PersistenceOperationMetricSet& metric)
: Operation(),
@@ -58,7 +59,7 @@ GetOperation::GetOperation(DistributorComponent& manager,
_metric(metric),
_operationTimer(manager.getClock())
{
- assignTargetNodeGroups();
+ assignTargetNodeGroups(*read_guard);
}
void
@@ -213,13 +214,13 @@ GetOperation::sendReply(DistributorMessageSender& sender)
}
void
-GetOperation::assignTargetNodeGroups()
+GetOperation::assignTargetNodeGroups(const BucketDatabase::ReadGuard& read_guard)
{
document::BucketIdFactory bucketIdFactory;
document::BucketId bid = bucketIdFactory.getBucketId(_msg->getDocumentId());
std::vector<BucketDatabase::Entry> entries;
- _bucketSpace.getBucketDatabase().acquire_read_guard()->find_parents_and_self(bid, entries);
+ read_guard.find_parents_and_self(bid, entries);
for (uint32_t j = 0; j < entries.size(); ++j) {
const BucketDatabase::Entry& e = entries[j];
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
index 3936f13077e..fe4dab5e9f2 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
@@ -3,7 +3,7 @@
#include <vespa/storageapi/defs.h>
#include <vespa/storage/distributor/operations/operation.h>
-#include <vespa/storage/bucketdb/bucketcopy.h>
+#include <vespa/storage/bucketdb/bucketdatabase.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/storageframework/generic/clock/timer.h>
@@ -23,8 +23,11 @@ class DistributorBucketSpace;
class GetOperation : public Operation
{
public:
- GetOperation(DistributorComponent& manager, DistributorBucketSpace &bucketSpace,
- std::shared_ptr<api::GetCommand> msg, PersistenceOperationMetricSet& metric);
+ GetOperation(DistributorComponent& manager,
+ const DistributorBucketSpace &bucketSpace,
+ std::shared_ptr<BucketDatabase::ReadGuard> read_guard,
+ std::shared_ptr<api::GetCommand> msg,
+ PersistenceOperationMetricSet& metric);
void onClose(DistributorMessageSender& sender) override;
void onStart(DistributorMessageSender& sender) override;
@@ -74,7 +77,7 @@ private:
std::map<GroupId, GroupVector> _responses;
DistributorComponent& _manager;
- DistributorBucketSpace &_bucketSpace;
+ const DistributorBucketSpace &_bucketSpace;
std::shared_ptr<api::GetCommand> _msg;
@@ -89,7 +92,7 @@ private:
void sendReply(DistributorMessageSender& sender);
bool sendForChecksum(DistributorMessageSender& sender, const document::BucketId& id, GroupVector& res);
- void assignTargetNodeGroups();
+ void assignTargetNodeGroups(const BucketDatabase::ReadGuard& read_guard);
bool copyIsOnLocalNode(const BucketCopy&) const;
/**
* Returns the vector index of the target to send to, or -1 if none
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index b7ebafc114c..b3326a43be2 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -178,7 +178,8 @@ TwoPhaseUpdateOperation::startSafePathUpdate(DistributorMessageSender& sender)
document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), document::BucketId(0));
auto get = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(),"[all]");
copyMessageSettings(*_updateCmd, *get);
- auto getOperation = std::make_shared<GetOperation>(_manager, _bucketSpace, get, _getMetric);
+ auto getOperation = std::make_shared<GetOperation>(
+ _manager, _bucketSpace, _bucketSpace.getBucketDatabase().acquire_read_guard(), get, _getMetric);
GetOperation & op = *getOperation;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(getOperation), sender);
op.start(intermediate, _manager.getClock().getTimeInMillis());