summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-26 14:39:50 +0100
committerGitHub <noreply@github.com>2019-03-26 14:39:50 +0100
commitb9ab7c6ae6e1931326c0f8b4e5cee09a2b1152ac (patch)
treea12edd9028888c048606df280e2a48ae1bd2f13f /storage
parent313411c31cc8b43f353565d714bc76bc1ca43e5c (diff)
parenta599019904e9f3673b8d834efd28350604d8b7fd (diff)
Merge pull request #8882 from vespa-engine/vekterli/add-read-only-support-during-cluster-state-transitions
Add read-only support during cluster state transitions
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp304
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp10
-rw-r--r--storage/src/tests/distributor/distributortestutil.h2
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp162
-rw-r--r--storage/src/tests/storageserver/bouncertest.cpp14
-rw-r--r--storage/src/tests/storageserver/fnet_listener_test.cpp88
-rw-r--r--storage/src/tests/storageserver/statemanagertest.cpp30
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h8
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def7
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp127
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h21
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp19
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h11
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.h15
-rw-r--r--storage/src/vespa/storage/distributor/distributormetricsset.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/distributormetricsset.h2
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp136
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.h15
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.cpp3
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.h1
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.h55
-rw-r--r--storage/src/vespa/storage/storageserver/bouncer.cpp1
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp9
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.cpp22
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.h1
-rw-r--r--storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp9
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.cpp13
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.h1
32 files changed, 962 insertions, 151 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index b2d554c1e42..9795f5db5dc 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -26,6 +26,8 @@ using document::test::makeDocumentBucket;
using document::test::makeBucketSpace;
using document::BucketSpace;
using document::FixedBucketSpaces;
+using document::BucketId;
+using document::Bucket;
namespace storage::distributor {
@@ -112,6 +114,14 @@ class BucketDBUpdaterTest : public CppUnit::TestFixture,
CPPUNIT_TEST(adding_diverging_replica_to_existing_trusted_does_not_remove_trusted);
CPPUNIT_TEST(batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted);
CPPUNIT_TEST(global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection);
+ CPPUNIT_TEST(non_owned_buckets_moved_to_read_only_db_on_ownership_change);
+ CPPUNIT_TEST(buckets_no_longer_available_are_not_moved_to_read_only_database);
+ CPPUNIT_TEST(non_owned_buckets_purged_when_read_only_support_is_config_disabled);
+ CPPUNIT_TEST(deferred_activated_state_does_not_enable_state_until_activation_received);
+ CPPUNIT_TEST(read_only_db_cleared_once_pending_state_is_activated);
+ CPPUNIT_TEST(read_only_db_is_populated_even_when_self_is_marked_down);
+ CPPUNIT_TEST(activate_cluster_state_request_with_mismatching_version_returns_actual_version);
+ CPPUNIT_TEST(activate_cluster_state_request_without_pending_transition_passes_message_through);
CPPUNIT_TEST_SUITE_END();
public:
@@ -123,10 +133,7 @@ protected:
void testDistributorChangeWithGrouping();
void testNormalUsageInitializing();
void testFailedRequestBucketInfo();
- void testNoResponses();
void testBitChange();
- void testInconsistentChecksum();
- void testAddEmptyNode();
void testNodeDown();
void testStorageNodeInMaintenanceClearsBucketsForNode();
void testNodeDownCopiesGetInSync();
@@ -177,6 +184,14 @@ protected:
void adding_diverging_replica_to_existing_trusted_does_not_remove_trusted();
void batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted();
void global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection();
+ void non_owned_buckets_moved_to_read_only_db_on_ownership_change();
+ void buckets_no_longer_available_are_not_moved_to_read_only_database();
+ void non_owned_buckets_purged_when_read_only_support_is_config_disabled();
+ void deferred_activated_state_does_not_enable_state_until_activation_received();
+ void read_only_db_cleared_once_pending_state_is_activated();
+ void read_only_db_is_populated_even_when_self_is_marked_down();
+ void activate_cluster_state_request_with_mismatching_version_returns_actual_version();
+ void activate_cluster_state_request_without_pending_transition_passes_message_through();
auto &defaultDistributorBucketSpace() { return getBucketSpaceRepo().get(makeBucketSpace()); }
@@ -190,13 +205,32 @@ protected:
getBucketDBUpdater().getDistributorComponent().getIndex(),
clusterStateBundle,
"ui"));
- auto &repo = getBucketSpaceRepo();
- for (auto &elem : repo) {
- elem.second->setClusterState(clusterStateBundle.getDerivedClusterState(elem.first));
+ for (auto* repo : {&mutable_repo(), &read_only_repo()}) {
+ for (auto& space : *repo) {
+ space.second->setClusterState(clusterStateBundle.getDerivedClusterState(space.first));
+ }
}
return clusterInfo;
}
+ DistributorBucketSpaceRepo& mutable_repo() noexcept { return getBucketSpaceRepo(); }
+ // Note: not calling this "immutable_repo" since it may actually be modified by the pending
+ // cluster state component (just not by operations), so it would not have the expected semantics.
+ DistributorBucketSpaceRepo& read_only_repo() noexcept { return getReadOnlyBucketSpaceRepo(); }
+
+ BucketDatabase& mutable_default_db() noexcept {
+ return mutable_repo().get(FixedBucketSpaces::default_space()).getBucketDatabase();
+ }
+ BucketDatabase& mutable_global_db() noexcept {
+ return mutable_repo().get(FixedBucketSpaces::global_space()).getBucketDatabase();
+ }
+ BucketDatabase& read_only_default_db() noexcept {
+ return read_only_repo().get(FixedBucketSpaces::default_space()).getBucketDatabase();
+ }
+ BucketDatabase& read_only_global_db() noexcept {
+ return read_only_repo().get(FixedBucketSpaces::global_space()).getBucketDatabase();
+ }
+
static std::string getNodeList(std::vector<uint16_t> nodes, size_t count);
std::string getNodeList(std::vector<uint16_t> nodes);
@@ -210,11 +244,17 @@ protected:
return messagesPerBucketSpace * _bucketSpaces.size();
}
+ void trigger_completed_but_not_yet_activated_transition(
+ vespalib::stringref initial_state, uint32_t initial_buckets, uint32_t initial_expected_msgs,
+ vespalib::stringref pending_state, uint32_t pending_buckets, uint32_t pending_expected_msgs);
+
public:
using OutdatedNodesMap = dbtransition::OutdatedNodesMap;
void setUp() override {
createLinks();
_bucketSpaces = getBucketSpaces();
+ // Disable deferred activation by default (at least for now) to avoid breaking the entire world.
+ getConfig().setAllowStaleReadsDuringClusterStateTransitions(false);
};
void tearDown() override {
@@ -228,7 +268,7 @@ public:
uint32_t bucketCount,
uint32_t invalidBucketCount = 0)
{
- RequestBucketInfoReply* sreply = new RequestBucketInfoReply(cmd);
+ auto sreply = std::make_shared<RequestBucketInfoReply>(cmd);
sreply->setAddress(storageAddress(storageIndex));
api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo();
@@ -261,7 +301,7 @@ public:
}
}
- return std::shared_ptr<api::RequestBucketInfoReply>(sreply);
+ return sreply;
}
void fakeBucketReply(const lib::ClusterState &state,
@@ -371,8 +411,7 @@ public:
void setSystemState(const lib::ClusterState& state) {
const size_t sizeBeforeState = _sender.commands.size();
getBucketDBUpdater().onSetSystemState(
- std::shared_ptr<api::SetSystemStateCommand>(
- new api::SetSystemStateCommand(state)));
+ std::make_shared<api::SetSystemStateCommand>(state));
// A lot of test logic has the assumption that all messages sent as a
// result of cluster state changes will be in increasing index order
// (for simplicity, not because this is required for correctness).
@@ -381,6 +420,26 @@ public:
sortSentMessagesByIndex(_sender, sizeBeforeState);
}
+ void set_cluster_state_bundle(const lib::ClusterStateBundle& state) {
+ const size_t sizeBeforeState = _sender.commands.size();
+ getBucketDBUpdater().onSetSystemState(
+ std::make_shared<api::SetSystemStateCommand>(state));
+ sortSentMessagesByIndex(_sender, sizeBeforeState);
+ }
+
+ bool activate_cluster_state_version(uint32_t version) {
+ return getBucketDBUpdater().onActivateClusterStateVersion(
+ std::make_shared<api::ActivateClusterStateVersionCommand>(version));
+ }
+
+ void assert_has_activate_cluster_state_reply_with_actual_version(uint32_t version) {
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
+ auto* response = dynamic_cast<api::ActivateClusterStateVersionReply*>(_sender.replies.back().get());
+ CPPUNIT_ASSERT(response != nullptr);
+ CPPUNIT_ASSERT_EQUAL(version, response->actualVersion());
+ _sender.clear();
+ }
+
void completeBucketInfoGathering(const lib::ClusterState& state,
size_t expectedMsgs,
uint32_t bucketCount = 1,
@@ -586,8 +645,9 @@ public:
OutdatedNodesMap outdatedNodesMap;
state = PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, owner.getBucketSpaceRepo(), cmd, outdatedNodesMap,
- api::Timestamp(1));
+ clock, clusterInfo, sender,
+ owner.getBucketSpaceRepo(), owner.getReadOnlyBucketSpaceRepo(),
+ cmd, outdatedNodesMap, api::Timestamp(1));
}
PendingClusterStateFixture(
@@ -598,23 +658,22 @@ public:
owner.createClusterInfo(oldClusterState));
state = PendingClusterState::createForDistributionChange(
- clock, clusterInfo, sender, owner.getBucketSpaceRepo(), api::Timestamp(1));
+ clock, clusterInfo, sender, owner.getBucketSpaceRepo(),
+ owner.getReadOnlyBucketSpaceRepo(), api::Timestamp(1));
}
};
- auto createPendingStateFixtureForStateChange(
+ std::unique_ptr<PendingClusterStateFixture> createPendingStateFixtureForStateChange(
const std::string& oldClusterState,
const std::string& newClusterState)
{
- return std::make_unique<PendingClusterStateFixture>(
- *this, oldClusterState, newClusterState);
+ return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState, newClusterState);
}
- auto createPendingStateFixtureForDistributionChange(
+ std::unique_ptr<PendingClusterStateFixture> createPendingStateFixtureForDistributionChange(
const std::string& oldClusterState)
{
- return std::make_unique<PendingClusterStateFixture>(
- *this, oldClusterState);
+ return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState);
}
};
@@ -622,8 +681,8 @@ CPPUNIT_TEST_SUITE_REGISTRATION(BucketDBUpdaterTest);
BucketDBUpdaterTest::BucketDBUpdaterTest()
: CppUnit::TestFixture(),
- DistributorTestUtil(),
- _bucketSpaces()
+ DistributorTestUtil(),
+ _bucketSpaces()
{
}
@@ -1533,7 +1592,8 @@ BucketDBUpdaterTest::getSentNodesDistributionChanged(
ClusterInformation::CSP clusterInfo(createClusterInfo(oldClusterState));
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForDistributionChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(), api::Timestamp(1)));
+ clock, clusterInfo, sender, getBucketSpaceRepo(),
+ getReadOnlyBucketSpaceRepo(), api::Timestamp(1)));
sortSentMessagesByIndex(sender);
@@ -1698,8 +1758,8 @@ BucketDBUpdaterTest::testPendingClusterStateReceive()
OutdatedNodesMap outdatedNodesMap;
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap,
- api::Timestamp(1)));
+ clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(),
+ cmd, outdatedNodesMap, api::Timestamp(1)));
CPPUNIT_ASSERT_EQUAL(messageCount(3), sender.commands.size());
@@ -1863,8 +1923,8 @@ BucketDBUpdaterTest::mergeBucketLists(
ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d"));
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap,
- beforeTime));
+ clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(),
+ cmd, outdatedNodesMap, beforeTime));
parseInputData(existingData, beforeTime, *state, includeBucketInfo);
state->mergeIntoBucketDatabases();
@@ -1882,8 +1942,8 @@ BucketDBUpdaterTest::mergeBucketLists(
ClusterInformation::CSP clusterInfo(createClusterInfo(oldState.toString()));
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap,
- afterTime));
+ clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(),
+ cmd, outdatedNodesMap, afterTime));
parseInputData(newData, afterTime, *state, includeBucketInfo);
state->mergeIntoBucketDatabases();
@@ -2599,4 +2659,192 @@ void BucketDBUpdaterTest::global_distribution_hash_falls_back_to_legacy_format_u
CPPUNIT_ASSERT_EQUAL(current_hash, new_current_req.getDistributionHash());
}
+namespace {
+
+template <typename Func>
+void for_each_bucket(const BucketDatabase& db, const document::BucketSpace& space, Func&& f) {
+ BucketId last(0);
+ auto e = db.getNext(last);
+ while (e.valid()) {
+ f(space, e);
+ e = db.getNext(e.getBucketId());
+ }
+}
+
+template <typename Func>
+void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) {
+ for (const auto& space : repo) {
+ for_each_bucket(space.second->getBucketDatabase(), space.first, f);
+ }
+}
+
+}
+
+using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder;
+
+void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_change() {
+ getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
+
+ lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
+ set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
+
+ CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size());
+ constexpr uint32_t n_buckets = 10;
+ completeBucketInfoGathering(initial_state, messageCount(4), n_buckets);
+ _sender.clear();
+
+ CPPUNIT_ASSERT_EQUAL(size_t(n_buckets), mutable_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(size_t(n_buckets), mutable_global_db().size());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size());
+
+ lib::ClusterState pending_state("distributor:2 storage:4");
+
+ std::unordered_set<Bucket, Bucket::hash> buckets_not_owned_in_pending_state;
+ for_each_bucket(mutable_repo(), [&](const auto& space, const auto& entry) {
+ if (!getBucketDBUpdater().getDistributorComponent()
+ .ownsBucketInState(pending_state, makeDocumentBucket(entry.getBucketId()))) {
+ buckets_not_owned_in_pending_state.insert(Bucket(space, entry.getBucketId()));
+ }
+ });
+ CPPUNIT_ASSERT(!buckets_not_owned_in_pending_state.empty());
+
+ set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true)); // Now requires activation
+
+ const auto buckets_not_owned_per_space = (buckets_not_owned_in_pending_state.size() / 2); // 2 spaces
+ const auto expected_mutable_buckets = n_buckets - buckets_not_owned_per_space;
+ CPPUNIT_ASSERT_EQUAL(expected_mutable_buckets, mutable_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(expected_mutable_buckets, mutable_global_db().size());
+ CPPUNIT_ASSERT_EQUAL(buckets_not_owned_per_space, read_only_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(buckets_not_owned_per_space, read_only_global_db().size());
+
+ for_each_bucket(read_only_repo(), [&](const auto& space, const auto& entry) {
+ CPPUNIT_ASSERT(buckets_not_owned_in_pending_state.find(Bucket(space, entry.getBucketId()))
+ != buckets_not_owned_in_pending_state.end());
+ });
+}
+
+void BucketDBUpdaterTest::buckets_no_longer_available_are_not_moved_to_read_only_database() {
+ constexpr uint32_t n_buckets = 10;
+ // No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will
+ // cause some buckets to be entirely unavailable.
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:1 storage:4 .0.s:d .1.s:m", n_buckets, 0);
+
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size());
+}
+
+void BucketDBUpdaterTest::non_owned_buckets_purged_when_read_only_support_is_config_disabled() {
+ getConfig().setAllowStaleReadsDuringClusterStateTransitions(false);
+
+ lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
+ set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
+
+ CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size());
+ constexpr uint32_t n_buckets = 10;
+ completeBucketInfoGathering(initial_state, messageCount(4), n_buckets);
+ _sender.clear();
+
+ // Nothing in read-only DB after first bulk load of buckets.
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size());
+
+ lib::ClusterState pending_state("distributor:2 storage:4");
+ setSystemState(pending_state);
+ // No buckets should be moved into read only db after ownership changes.
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size());
+}
+
+void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition(
+ vespalib::stringref initial_state_str,
+ uint32_t initial_buckets,
+ uint32_t initial_expected_msgs,
+ vespalib::stringref pending_state_str,
+ uint32_t pending_buckets,
+ uint32_t pending_expected_msgs)
+{
+ getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
+ lib::ClusterState initial_state(initial_state_str);
+ setSystemState(initial_state);
+ CPPUNIT_ASSERT_EQUAL(messageCount(initial_expected_msgs), _sender.commands.size());
+ completeBucketInfoGathering(initial_state, messageCount(initial_expected_msgs), initial_buckets);
+ _sender.clear();
+
+ lib::ClusterState pending_state(pending_state_str); // Ownership change
+ set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true));
+ CPPUNIT_ASSERT_EQUAL(messageCount(pending_expected_msgs), _sender.commands.size());
+ completeBucketInfoGathering(pending_state, messageCount(pending_expected_msgs), pending_buckets);
+ _sender.clear();
+}
+
+void BucketDBUpdaterTest::deferred_activated_state_does_not_enable_state_until_activation_received() {
+ constexpr uint32_t n_buckets = 10;
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:1 storage:4", n_buckets, 4);
+
+ // Version should not be switched over yet
+ CPPUNIT_ASSERT_EQUAL(uint32_t(1), getDistributor().getClusterStateBundle().getVersion());
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_global_db().size());
+
+ CPPUNIT_ASSERT(!activate_cluster_state_version(2));
+
+ CPPUNIT_ASSERT_EQUAL(uint32_t(2), getDistributor().getClusterStateBundle().getVersion());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), mutable_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), mutable_global_db().size());
+}
+
+void BucketDBUpdaterTest::read_only_db_cleared_once_pending_state_is_activated() {
+ constexpr uint32_t n_buckets = 10;
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:2 storage:4", n_buckets, 0);
+ CPPUNIT_ASSERT(!activate_cluster_state_version(2));
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_global_db().size());
+}
+
+void BucketDBUpdaterTest::read_only_db_is_populated_even_when_self_is_marked_down() {
+ constexpr uint32_t n_buckets = 10;
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:1 .0.s:d storage:4", n_buckets, 0);
+
+ // State not yet activated, so read-only DBs have got all the buckets we used to have.
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_global_db().size());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), read_only_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), read_only_global_db().size());
+}
+
+void BucketDBUpdaterTest::activate_cluster_state_request_with_mismatching_version_returns_actual_version() {
+ constexpr uint32_t n_buckets = 10;
+ trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4,
+ "version:5 distributor:2 storage:4", n_buckets, 0);
+
+ CPPUNIT_ASSERT(activate_cluster_state_version(4)); // Too old version
+ assert_has_activate_cluster_state_reply_with_actual_version(5);
+
+ CPPUNIT_ASSERT(activate_cluster_state_version(6)); // More recent version than what has been observed
+ assert_has_activate_cluster_state_reply_with_actual_version(5);
+}
+
+void BucketDBUpdaterTest::activate_cluster_state_request_without_pending_transition_passes_message_through() {
+ constexpr uint32_t n_buckets = 10;
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:1 storage:4", n_buckets, 4);
+ // Activate version 2; no pending cluster state after this.
+ CPPUNIT_ASSERT(!activate_cluster_state_version(2));
+
+ // No pending cluster state for version 3, just passed through to be implicitly bounced by state manager.
+ // Note: state manager is not modelled in this test, so we just check that the message handler returns
+ // false (meaning "didn't take message ownership") and there's no auto-generated reply.
+ CPPUNIT_ASSERT(!activate_cluster_state_version(3));
+ CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.replies.size());
+}
+
+// TODO rename distributor config to imply two phase functionlity explicitly?
+
}
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index d3496d0c9f6..3f7f2eac63a 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -388,6 +388,16 @@ DistributorTestUtil::getBucketSpaceRepo() const {
return _distributor->getBucketSpaceRepo();
}
+DistributorBucketSpaceRepo &
+DistributorTestUtil::getReadOnlyBucketSpaceRepo() {
+ return _distributor->getReadOnlyBucketSpaceRepo();
+}
+
+const DistributorBucketSpaceRepo &
+DistributorTestUtil::getReadOnlyBucketSpaceRepo() const {
+ return _distributor->getReadOnlyBucketSpaceRepo();
+}
+
const lib::Distribution&
DistributorTestUtil::getDistribution() const {
return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution();
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index 10cc5eeaca1..420111437d2 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -132,6 +132,8 @@ public:
const BucketDatabase& getBucketDatabase(document::BucketSpace space) const;
DistributorBucketSpaceRepo &getBucketSpaceRepo();
const DistributorBucketSpaceRepo &getBucketSpaceRepo() const;
+ DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo();
+ const DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() const;
const lib::Distribution& getDistribution() const;
// "End to end" distribution change trigger, which will invoke the bucket
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index ddf88f50c36..40fe885dcb1 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -4,6 +4,7 @@
#include <vespa/storage/distributor/externaloperationhandler.h>
#include <vespa/storage/distributor/distributor.h>
#include <vespa/storage/distributor/distributormetricsset.h>
+#include <vespa/storage/distributor/operations/external/getoperation.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/update/documentupdate.h>
@@ -20,8 +21,11 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
CPPUNIT_TEST_SUITE(ExternalOperationHandlerTest);
CPPUNIT_TEST(testBucketSplitMask);
- CPPUNIT_TEST(testOperationRejectedOnWrongDistribution);
- CPPUNIT_TEST(testOperationRejectedOnPendingWrongDistribution);
+ CPPUNIT_TEST(mutating_operation_wdr_bounced_on_wrong_current_distribution);
+ CPPUNIT_TEST(mutating_operation_busy_bounced_on_wrong_pending_distribution);
+ CPPUNIT_TEST(mutating_operation_busy_bounced_if_no_cluster_state_received_yet);
+ CPPUNIT_TEST(read_only_operation_wdr_bounced_on_wrong_current_distribution);
+ CPPUNIT_TEST(read_only_operation_busy_bounced_if_no_cluster_state_received_yet);
CPPUNIT_TEST(reject_put_if_not_past_safe_time_point);
CPPUNIT_TEST(reject_remove_if_not_past_safe_time_point);
CPPUNIT_TEST(reject_update_if_not_past_safe_time_point);
@@ -37,6 +41,9 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
CPPUNIT_TEST(concurrent_get_and_mutation_do_not_conflict);
CPPUNIT_TEST(sequencing_works_across_mutation_types);
CPPUNIT_TEST(sequencing_can_be_explicitly_config_disabled);
+ CPPUNIT_TEST(gets_are_started_with_mutable_db_outside_transition_period);
+ CPPUNIT_TEST(gets_are_started_with_read_only_db_during_transition_period);
+ CPPUNIT_TEST(gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled);
CPPUNIT_TEST_SUITE_END();
document::BucketId findNonOwnedUserBucketInState(vespalib::stringref state);
@@ -49,10 +56,13 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
std::shared_ptr<api::UpdateCommand> makeUpdateCommand(const vespalib::string& doc_type,
const vespalib::string& id) const;
std::shared_ptr<api::UpdateCommand> makeUpdateCommand() const;
+ std::shared_ptr<api::UpdateCommand> makeUpdateCommandForUser(uint64_t id) const;
std::shared_ptr<api::PutCommand> makePutCommand(const vespalib::string& doc_type,
const vespalib::string& id) const;
std::shared_ptr<api::RemoveCommand> makeRemoveCommand(const vespalib::string& id) const;
+ void verify_busy_bounced_due_to_no_active_state(std::shared_ptr<api::StorageCommand> cmd);
+
Operation::SP start_operation_verify_not_rejected(std::shared_ptr<api::StorageCommand> cmd);
void start_operation_verify_rejected(std::shared_ptr<api::StorageCommand> cmd);
@@ -80,10 +90,16 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
const vespalib::string _dummy_id{"id:foo:testdoctype1::bar"};
+ // Returns an arbitrary bucket not owned in the pending state
+ document::BucketId set_up_pending_cluster_state_transition(bool read_only_enabled);
+
protected:
void testBucketSplitMask();
- void testOperationRejectedOnWrongDistribution();
- void testOperationRejectedOnPendingWrongDistribution();
+ void mutating_operation_wdr_bounced_on_wrong_current_distribution();
+ void mutating_operation_busy_bounced_on_wrong_pending_distribution();
+ void mutating_operation_busy_bounced_if_no_cluster_state_received_yet();
+ void read_only_operation_wdr_bounced_on_wrong_current_distribution();
+ void read_only_operation_busy_bounced_if_no_cluster_state_received_yet();
void reject_put_if_not_past_safe_time_point();
void reject_remove_if_not_past_safe_time_point();
void reject_update_if_not_past_safe_time_point();
@@ -99,6 +115,9 @@ protected:
void concurrent_get_and_mutation_do_not_conflict();
void sequencing_works_across_mutation_types();
void sequencing_can_be_explicitly_config_disabled();
+ void gets_are_started_with_mutable_db_outside_transition_period();
+ void gets_are_started_with_read_only_db_during_transition_period();
+ void gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled();
void assert_rejection_due_to_unsafe_time(
std::shared_ptr<api::StorageCommand> cmd);
@@ -220,6 +239,11 @@ ExternalOperationHandlerTest::makeUpdateCommand() const {
return makeUpdateCommand("testdoctype1", "id:foo:testdoctype1::baz");
}
+std::shared_ptr<api::UpdateCommand>
+ExternalOperationHandlerTest::makeUpdateCommandForUser(uint64_t id) const {
+ return makeUpdateCommand("testdoctype1", vespalib::make_string("id::testdoctype1:n=%" PRIu64 ":bar", id));
+}
+
std::shared_ptr<api::PutCommand> ExternalOperationHandlerTest::makePutCommand(
const vespalib::string& doc_type,
const vespalib::string& id) const {
@@ -233,10 +257,30 @@ std::shared_ptr<api::RemoveCommand> ExternalOperationHandlerTest::makeRemoveComm
}
void
-ExternalOperationHandlerTest::testOperationRejectedOnWrongDistribution()
+ExternalOperationHandlerTest::mutating_operation_wdr_bounced_on_wrong_current_distribution()
{
createLinks();
- std::string state("distributor:2 storage:2");
+ std::string state("version:1 distributor:2 storage:2");
+ setupDistributor(1, 2, state);
+
+ document::BucketId bucket(findNonOwnedUserBucketInState(state));
+ auto cmd = makeUpdateCommandForUser(bucket.withoutCountBits());
+
+ Operation::SP genOp;
+ CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp));
+ CPPUNIT_ASSERT(!genOp.get());
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("ReturnCode(WRONG_DISTRIBUTION, "
+ "version:1 distributor:2 storage:2)"),
+ _sender.replies[0]->getResult().toString());
+}
+
+void
+ExternalOperationHandlerTest::read_only_operation_wdr_bounced_on_wrong_current_distribution()
+{
+ createLinks();
+ std::string state("version:1 distributor:2 storage:2");
setupDistributor(1, 2, state);
document::BucketId bucket(findNonOwnedUserBucketInState(state));
@@ -248,43 +292,65 @@ ExternalOperationHandlerTest::testOperationRejectedOnWrongDistribution()
CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
CPPUNIT_ASSERT_EQUAL(
std::string("ReturnCode(WRONG_DISTRIBUTION, "
- "distributor:2 storage:2)"),
+ "version:1 distributor:2 storage:2)"),
_sender.replies[0]->getResult().toString());
}
void
-ExternalOperationHandlerTest::testOperationRejectedOnPendingWrongDistribution()
+ExternalOperationHandlerTest::mutating_operation_busy_bounced_on_wrong_pending_distribution()
{
createLinks();
- std::string current("distributor:2 storage:2");
- std::string pending("distributor:3 storage:3");
+ std::string current("version:10 distributor:2 storage:2");
+ std::string pending("version:11 distributor:3 storage:3");
setupDistributor(1, 3, current);
document::BucketId b(findOwned1stNotOwned2ndInStates(current, pending));
// Trigger pending cluster state
- auto stateCmd = std::make_shared<api::SetSystemStateCommand>(
- lib::ClusterState(pending));
+ auto stateCmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(pending));
getBucketDBUpdater().onSetSystemState(stateCmd);
- auto cmd = makeGetCommandForUser(b.withoutCountBits());
+ auto cmd = makeUpdateCommandForUser(b.withoutCountBits());
Operation::SP genOp;
CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp));
CPPUNIT_ASSERT(!genOp.get());
CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
- // Fail back with _pending_ cluster state so client can start trying
- // correct distributor immediately. If that distributor has not yet
- // completed processing its pending cluster state, it'll return the
- // old (current) cluster state, causing the client to bounce between
- // the two until the pending states have been resolved. This is pretty
- // much inevitable with the current design.
CPPUNIT_ASSERT_EQUAL(
- std::string("ReturnCode(WRONG_DISTRIBUTION, "
- "distributor:3 storage:3)"),
+ std::string("ReturnCode(BUSY, Currently pending cluster state transition from version 10 to 11)"),
_sender.replies[0]->getResult().toString());
}
+void
+ExternalOperationHandlerTest::verify_busy_bounced_due_to_no_active_state(std::shared_ptr<api::StorageCommand> cmd)
+{
+ createLinks();
+ std::string state{}; // No version --> not yet received
+ setupDistributor(1, 2, state);
+
+ Operation::SP genOp;
+ CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp));
+ CPPUNIT_ASSERT(!genOp.get());
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("ReturnCode(BUSY, No cluster state activated yet)"),
+ _sender.replies[0]->getResult().toString());
+}
+
+// TODO NOT_READY is a more appropriate return code for this case, but must ensure it's
+// handled gracefully and silently through the stack. BUSY is a safe bet until then.
+void
+ExternalOperationHandlerTest::mutating_operation_busy_bounced_if_no_cluster_state_received_yet()
+{
+ verify_busy_bounced_due_to_no_active_state(makeUpdateCommandForUser(12345));
+}
+
+void
+ExternalOperationHandlerTest::read_only_operation_busy_bounced_if_no_cluster_state_received_yet()
+{
+ verify_busy_bounced_due_to_no_active_state(makeGetCommandForUser(12345));
+}
+
using TimePoint = ExternalOperationHandler::TimePoint;
using namespace std::literals::chrono_literals;
@@ -292,7 +358,7 @@ void ExternalOperationHandlerTest::assert_rejection_due_to_unsafe_time(
std::shared_ptr<api::StorageCommand> cmd)
{
createLinks();
- setupDistributor(1, 2, "distributor:1 storage:1");
+ setupDistributor(1, 2, "version:1 distributor:1 storage:1");
getClock().setAbsoluteTimeInSeconds(9);
getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s));
@@ -327,7 +393,7 @@ void ExternalOperationHandlerTest::reject_update_if_not_past_safe_time_point() {
void ExternalOperationHandlerTest::get_not_rejected_by_unsafe_time_point() {
createLinks();
- setupDistributor(1, 2, "distributor:1 storage:1");
+ setupDistributor(1, 2, "version:1 distributor:1 storage:1");
getClock().setAbsoluteTimeInSeconds(9);
getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s));
@@ -342,7 +408,7 @@ void ExternalOperationHandlerTest::get_not_rejected_by_unsafe_time_point() {
void ExternalOperationHandlerTest::mutation_not_rejected_when_safe_point_reached() {
createLinks();
- setupDistributor(1, 2, "distributor:1 storage:1");
+ setupDistributor(1, 2, "version:1 distributor:1 storage:1");
getClock().setAbsoluteTimeInSeconds(10);
getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s));
@@ -360,7 +426,7 @@ void ExternalOperationHandlerTest::mutation_not_rejected_when_safe_point_reached
void ExternalOperationHandlerTest::set_up_distributor_for_sequencing_test() {
createLinks();
- setupDistributor(1, 2, "distributor:1 storage:1");
+ setupDistributor(1, 2, "version:1 distributor:1 storage:1");
}
Operation::SP ExternalOperationHandlerTest::start_operation_verify_not_rejected(
@@ -486,6 +552,52 @@ void ExternalOperationHandlerTest::sequencing_can_be_explicitly_config_disabled(
start_operation_verify_not_rejected(makeRemoveCommand(_dummy_id));
}
+void ExternalOperationHandlerTest::gets_are_started_with_mutable_db_outside_transition_period() {
+ createLinks();
+ std::string current = "version:1 distributor:1 storage:3";
+ setupDistributor(1, 3, current);
+ getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
+
+ document::BucketId b(16, 1234); // Only 1 distributor (us), so doesn't matter
+
+ auto op = start_operation_verify_not_rejected(makeGetCommandForUser(b.withoutCountBits()));
+ auto& get_op = dynamic_cast<GetOperation&>(*op);
+ const auto* expected_space = &getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space());
+ CPPUNIT_ASSERT_EQUAL(expected_space, &get_op.bucketSpace());
+}
+
+document::BucketId ExternalOperationHandlerTest::set_up_pending_cluster_state_transition(bool read_only_enabled) {
+ createLinks();
+ std::string current = "version:123 distributor:2 storage:2";
+ std::string pending = "version:321 distributor:3 storage:3";
+ setupDistributor(1, 3, current);
+ getConfig().setAllowStaleReadsDuringClusterStateTransitions(read_only_enabled);
+
+ // Trigger pending cluster state
+ auto stateCmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(pending));
+ getBucketDBUpdater().onSetSystemState(stateCmd);
+ return findOwned1stNotOwned2ndInStates(current, pending);
+}
+
+void ExternalOperationHandlerTest::gets_are_started_with_read_only_db_during_transition_period() {
+ auto non_owned_bucket = set_up_pending_cluster_state_transition(true);
+
+ auto op = start_operation_verify_not_rejected(makeGetCommandForUser(non_owned_bucket.withoutCountBits()));
+ auto& get_op = dynamic_cast<GetOperation&>(*op);
+ const auto* expected_space = &getReadOnlyBucketSpaceRepo().get(document::FixedBucketSpaces::default_space());
+ CPPUNIT_ASSERT_EQUAL(expected_space, &get_op.bucketSpace());
+}
+
+void ExternalOperationHandlerTest::gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled() {
+ auto non_owned_bucket = set_up_pending_cluster_state_transition(false);
+
+ start_operation_verify_rejected(makeGetCommandForUser(non_owned_bucket.withoutCountBits()));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("ReturnCode(BUSY, Currently pending cluster state transition from version 123 to 321)"),
+ _sender.replies[0]->getResult().toString());
+
+}
+
// TODO support sequencing of RemoveLocation? It's a mutating operation, but supporting it with
// the current approach is not trivial. A RemoveLocation operation covers the _entire_ bucket
// sub tree under a given location, while the sequencer works on individual GIDs. Mapping the
diff --git a/storage/src/tests/storageserver/bouncertest.cpp b/storage/src/tests/storageserver/bouncertest.cpp
index 27c13a3707e..371c24accbc 100644
--- a/storage/src/tests/storageserver/bouncertest.cpp
+++ b/storage/src/tests/storageserver/bouncertest.cpp
@@ -43,6 +43,7 @@ struct BouncerTest : public CppUnit::TestFixture {
void outOfBoundsConfigValuesThrowException();
void abort_request_when_derived_bucket_space_node_state_is_marked_down();
void client_operations_are_allowed_through_on_cluster_state_down_distributor();
+ void cluster_state_activation_commands_are_not_bounced();
CPPUNIT_TEST_SUITE(BouncerTest);
CPPUNIT_TEST(testFutureTimestamp);
@@ -57,6 +58,7 @@ struct BouncerTest : public CppUnit::TestFixture {
CPPUNIT_TEST(outOfBoundsConfigValuesThrowException);
CPPUNIT_TEST(abort_request_when_derived_bucket_space_node_state_is_marked_down);
CPPUNIT_TEST(client_operations_are_allowed_through_on_cluster_state_down_distributor);
+ CPPUNIT_TEST(cluster_state_activation_commands_are_not_bounced);
CPPUNIT_TEST_SUITE_END();
using Priority = api::StorageMessage::Priority;
@@ -368,5 +370,17 @@ void BouncerTest::client_operations_are_allowed_through_on_cluster_state_down_di
CPPUNIT_ASSERT_EQUAL(uint64_t(0), _manager->metrics().unavailable_node_aborts.getValue());
}
+void BouncerTest::cluster_state_activation_commands_are_not_bounced() {
+ tearDown();
+ setUpAsNode(lib::NodeType::DISTRIBUTOR);
+
+ auto state = makeClusterStateBundle("version:10 distributor:3 .2.s:d storage:3", {}); // Our index (2) is down
+ _node->getNodeStateUpdater().setClusterStateBundle(state);
+
+ auto activate_cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(11);
+ _upper->sendDown(activate_cmd);
+ assertMessageNotBounced();
+}
+
} // storage
diff --git a/storage/src/tests/storageserver/fnet_listener_test.cpp b/storage/src/tests/storageserver/fnet_listener_test.cpp
index 84051041d25..d40b230d725 100644
--- a/storage/src/tests/storageserver/fnet_listener_test.cpp
+++ b/storage/src/tests/storageserver/fnet_listener_test.cpp
@@ -27,6 +27,9 @@ public:
CPPUNIT_TEST(set_distribution_rpc_is_immediately_failed_if_listener_is_closed);
CPPUNIT_TEST(overly_large_uncompressed_bundle_size_parameter_returns_rpc_error);
CPPUNIT_TEST(mismatching_uncompressed_bundle_size_parameter_returns_rpc_error);
+ CPPUNIT_TEST(true_deferred_activation_flag_can_be_roundtrip_encoded);
+ CPPUNIT_TEST(false_deferred_activation_flag_can_be_roundtrip_encoded);
+ CPPUNIT_TEST(activate_cluster_state_version_rpc_enqueues_command_with_version);
CPPUNIT_TEST_SUITE_END();
void baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle();
@@ -35,6 +38,9 @@ public:
void set_distribution_rpc_is_immediately_failed_if_listener_is_closed();
void overly_large_uncompressed_bundle_size_parameter_returns_rpc_error();
void mismatching_uncompressed_bundle_size_parameter_returns_rpc_error();
+ void true_deferred_activation_flag_can_be_roundtrip_encoded();
+ void false_deferred_activation_flag_can_be_roundtrip_encoded();
+ void activate_cluster_state_version_rpc_enqueues_command_with_version();
};
CPPUNIT_TEST_SUITE_REGISTRATION(FNetListenerTest);
@@ -54,24 +60,25 @@ struct DummyReturnHandler : FRT_IReturnHandler {
FNET_Connection* GetConnection() override { return nullptr; }
};
-struct Fixture {
+struct FixtureBase {
// TODO factor out Slobrok code to avoid need to set up live ports for unrelated tests
mbus::Slobrok slobrok;
vdstestlib::DirConfig config;
MockOperationEnqueuer enqueuer;
std::unique_ptr<FNetListener> fnet_listener;
- SlimeClusterStateBundleCodec codec;
DummyReturnHandler return_handler;
bool request_is_detached{false};
FRT_RPCRequest* bound_request{nullptr};
- Fixture() : config(getStandardConfig(true)) {
+ FixtureBase()
+ : config(getStandardConfig(true))
+ {
config.getConfig("stor-server").set("node_index", "1");
addSlobrokConfig(config, slobrok);
fnet_listener = std::make_unique<FNetListener>(enqueuer, config.getConfigId(), 0);
}
- ~Fixture() {
+ virtual ~FixtureBase() {
// Must destroy any associated message contexts that may have refs to FRT_Request
// instance _before_ we destroy the request itself.
enqueuer._enqueued.clear();
@@ -79,6 +86,12 @@ struct Fixture {
bound_request->SubRef();
}
}
+};
+
+struct SetStateFixture : FixtureBase {
+ SlimeClusterStateBundleCodec codec;
+
+ SetStateFixture() : FixtureBase() {}
void bind_request_params(EncodedClusterStateBundle& encoded_bundle, uint32_t uncompressed_length) {
bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting
@@ -123,6 +136,10 @@ struct Fixture {
lib::ClusterStateBundle dummy_baseline_bundle() const {
return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3"));
}
+
+ lib::ClusterStateBundle dummy_baseline_bundle_with_deferred_activation(bool deferred) const {
+ return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3"), {}, deferred);
+ }
};
std::shared_ptr<const lib::ClusterState> state_of(vespalib::stringref state) {
@@ -138,17 +155,17 @@ vespalib::string make_compressable_state_string() {
ss.str().data(), ss.str().data());
}
-}
+} // anon namespace
void FNetListenerTest::baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle() {
- Fixture f;
+ SetStateFixture f;
auto baseline = f.dummy_baseline_bundle();
f.assert_request_received_and_propagated(baseline);
}
void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle() {
- Fixture f;
+ SetStateFixture f;
lib::ClusterStateBundle spaces_bundle(
lib::ClusterState("version:123 distributor:3 storage:3"),
{{FixedBucketSpaces::default_space(), state_of("version:123 distributor:3 storage:3 .0.s:d")},
@@ -158,7 +175,7 @@ void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command
}
void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() {
- Fixture f;
+ SetStateFixture f;
auto state_str = make_compressable_state_string();
lib::ClusterStateBundle compressable_bundle{lib::ClusterState(state_str)};
@@ -171,24 +188,73 @@ void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() {
}
void FNetListenerTest::set_distribution_rpc_is_immediately_failed_if_listener_is_closed() {
- Fixture f;
+ SetStateFixture f;
f.create_request(f.dummy_baseline_bundle());
f.fnet_listener->close();
f.assert_request_returns_error_response(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN);
}
void FNetListenerTest::overly_large_uncompressed_bundle_size_parameter_returns_rpc_error() {
- Fixture f;
+ SetStateFixture f;
auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle());
f.bind_request_params(encoded_bundle, FNetListener::StateBundleMaxUncompressedSize + 1);
f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST);
}
void FNetListenerTest::mismatching_uncompressed_bundle_size_parameter_returns_rpc_error() {
- Fixture f;
+ SetStateFixture f;
auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle());
f.bind_request_params(encoded_bundle, encoded_bundle._buffer->getDataLen() + 100);
f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST);
}
+void FNetListenerTest::true_deferred_activation_flag_can_be_roundtrip_encoded() {
+ SetStateFixture f;
+ f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(true));
+
+}
+
+void FNetListenerTest::false_deferred_activation_flag_can_be_roundtrip_encoded() {
+ SetStateFixture f;
+ f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(false));
+}
+
+struct ActivateStateFixture : FixtureBase {
+ ActivateStateFixture() : FixtureBase() {}
+
+ void bind_request_params(uint32_t activate_version) {
+ bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting
+ auto* params = bound_request->GetParams();
+ params->AddInt32(activate_version);
+
+ bound_request->SetDetachedPT(&request_is_detached);
+ bound_request->SetReturnHandler(&return_handler);
+ }
+
+ void create_request(uint32_t activate_version) {
+ // Only 1 request allowed per fixture due to lifetime handling snags
+ assert(bound_request == nullptr);
+ bind_request_params(activate_version);
+ }
+
+ void assert_enqueued_operation_has_activate_version(uint32_t version) {
+ CPPUNIT_ASSERT(bound_request != nullptr);
+ CPPUNIT_ASSERT(request_is_detached);
+ CPPUNIT_ASSERT_EQUAL(size_t(1), enqueuer._enqueued.size());
+ auto& state_request = dynamic_cast<const api::ActivateClusterStateVersionCommand&>(*enqueuer._enqueued[0]);
+ CPPUNIT_ASSERT_EQUAL(version, state_request.version());
+ }
+
+ void assert_request_received_and_propagated(uint32_t activate_version) {
+ create_request(activate_version);
+ fnet_listener->RPC_activateClusterStateVersion(bound_request);
+ assert_enqueued_operation_has_activate_version(activate_version);
+ }
+};
+
+void FNetListenerTest::activate_cluster_state_version_rpc_enqueues_command_with_version() {
+ ActivateStateFixture f;
+ f.assert_request_received_and_propagated(1234567);
+}
+
}
diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp
index 19f414482db..cdf990fa28f 100644
--- a/storage/src/tests/storageserver/statemanagertest.cpp
+++ b/storage/src/tests/storageserver/statemanagertest.cpp
@@ -37,6 +37,7 @@ struct StateManagerTest : public CppUnit::TestFixture {
void can_explicitly_send_get_node_state_reply();
void explicit_node_state_replying_without_pending_request_immediately_replies_on_next_request();
void immediate_node_state_replying_is_tracked_per_controller();
+ void activation_command_is_bounced_with_current_cluster_state_version();
CPPUNIT_TEST_SUITE(StateManagerTest);
CPPUNIT_TEST(testSystemState);
@@ -45,8 +46,10 @@ struct StateManagerTest : public CppUnit::TestFixture {
CPPUNIT_TEST(can_explicitly_send_get_node_state_reply);
CPPUNIT_TEST(explicit_node_state_replying_without_pending_request_immediately_replies_on_next_request);
CPPUNIT_TEST(immediate_node_state_replying_is_tracked_per_controller);
+ CPPUNIT_TEST(activation_command_is_bounced_with_current_cluster_state_version);
CPPUNIT_TEST_SUITE_END();
+ void force_current_cluster_state_version(uint32_t version);
void mark_reported_node_state_up();
void send_down_get_node_state_request(uint16_t controller_index);
void assert_ok_get_node_state_reply_sent_and_clear();
@@ -101,6 +104,12 @@ StateManagerTest::tearDown() {
_metricManager.reset();
}
+void StateManagerTest::force_current_cluster_state_version(uint32_t version) {
+ ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState());
+ state.setVersion(version);
+ _manager->setClusterStateBundle(lib::ClusterStateBundle(state));
+}
+
#define GET_ONLY_OK_REPLY(varname) \
{ \
CPPUNIT_ASSERT_EQUAL(size_t(1), _upper->getNumReplies()); \
@@ -236,9 +245,7 @@ StateManagerTest::testReportedNodeState()
}
void StateManagerTest::current_cluster_state_version_is_included_in_host_info_json() {
- ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState());
- state.setVersion(123);
- _manager->setClusterStateBundle(lib::ClusterStateBundle(state));
+ force_current_cluster_state_version(123);
std::string nodeInfoString(_manager->getNodeInfo());
vespalib::Memory goldenMemory(nodeInfoString);
@@ -343,4 +350,21 @@ void StateManagerTest::immediate_node_state_replying_is_tracked_per_controller()
CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumReplies());
}
+void StateManagerTest::activation_command_is_bounced_with_current_cluster_state_version() {
+ force_current_cluster_state_version(12345);
+
+ auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(12340);
+ cmd->setTimeout(10000000);
+ cmd->setSourceIndex(0);
+ _upper->sendDown(cmd);
+
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _upper->getNumReplies());
+ std::shared_ptr<api::StorageReply> reply;
+ GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_REPLY, reply->getType());
+ auto& activate_reply = dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply);
+ CPPUNIT_ASSERT_EQUAL(uint32_t(12340), activate_reply.activateVersion());
+ CPPUNIT_ASSERT_EQUAL(uint32_t(12345), activate_reply.actualVersion());
+}
+
} // storage
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index 44cf56fdff8..294ce56f536 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -34,6 +34,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_enableHostInfoReporting(true),
_disableBucketActivation(false),
_sequenceMutatingOperations(true),
+ _allowStaleReadsDuringClusterStateTransitions(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{ }
@@ -144,6 +145,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_enableHostInfoReporting = config.enableHostInfoReporting;
_disableBucketActivation = config.disableBucketActivation;
_sequenceMutatingOperations = config.sequenceMutatingOperations;
+ _allowStaleReadsDuringClusterStateTransitions = config.allowStaleReadsDuringClusterStateTransitions;
_minimumReplicaCountingMode = config.minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index 5dfc4f66cb8..8c84fef47b5 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -235,6 +235,13 @@ public:
void setSequenceMutatingOperations(bool sequenceMutations) noexcept {
_sequenceMutatingOperations = sequenceMutations;
}
+
+ bool allowStaleReadsDuringClusterStateTransitions() const noexcept {
+ return _allowStaleReadsDuringClusterStateTransitions;
+ }
+ void setAllowStaleReadsDuringClusterStateTransitions(bool allow) noexcept {
+ _allowStaleReadsDuringClusterStateTransitions = allow;
+ }
private:
DistributorConfiguration(const DistributorConfiguration& other);
@@ -274,6 +281,7 @@ private:
bool _enableHostInfoReporting;
bool _disableBucketActivation;
bool _sequenceMutatingOperations;
+ bool _allowStaleReadsDuringClusterStateTransitions;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index 89aad427ca9..d4f69073cc6 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -184,3 +184,10 @@ sequence_mutating_operations bool default=true
## towards a node if it has indicated that its merge queues are full or it is
## suffering from resource exhaustion.
inhibit_merge_sending_on_busy_node_duration_sec int default=10
+
+## If set, enables potentially stale reads during cluster state transitions where
+## buckets change ownership. This also implicitly enables support for two-phase
+## cluster state transitions on the distributor.
+## For this option to take effect, the cluster controller must also have two-phase
+## states enabled.
+allow_stale_reads_during_cluster_state_transitions bool default=false
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index a223001af79..e9595b4a960 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -20,11 +20,12 @@ using document::BucketSpace;
namespace storage::distributor {
BucketDBUpdater::BucketDBUpdater(Distributor& owner,
- DistributorBucketSpaceRepo &bucketSpaceRepo,
+ DistributorBucketSpaceRepo& bucketSpaceRepo,
+ DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
DistributorMessageSender& sender,
DistributorComponentRegister& compReg)
: framework::StatusReporter("bucketdb", "Bucket DB Updater"),
- _distributorComponent(owner, bucketSpaceRepo, compReg, "Bucket DB Updater"),
+ _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"),
_sender(sender),
_transitionTimer(_distributorComponent.getClock())
{
@@ -53,6 +54,13 @@ BucketDBUpdater::print(std::ostream& out, bool verbose, const std::string& inden
}
bool
+BucketDBUpdater::shouldDeferStateEnabling() const noexcept
+{
+ return _distributorComponent.getDistributor().getConfig()
+ .allowStaleReadsDuringClusterStateTransitions();
+}
+
+bool
BucketDBUpdater::hasPendingClusterState() const
{
return static_cast<bool>(_pendingClusterState);
@@ -113,25 +121,35 @@ void
BucketDBUpdater::removeSuperfluousBuckets(
const lib::ClusterStateBundle& newState)
{
+ const bool move_to_read_only_db = shouldDeferStateEnabling();
for (auto &elem : _distributorComponent.getBucketSpaceRepo()) {
const auto &newDistribution(elem.second->getDistribution());
const auto &oldClusterState(elem.second->getClusterState());
auto &bucketDb(elem.second->getBucketDatabase());
+ auto& readOnlyDb(_distributorComponent.getReadOnlyBucketSpaceRepo().get(elem.first).getBucketDatabase());
// Remove all buckets not belonging to this distributor, or
// being on storage nodes that are no longer up.
NodeRemover proc(
oldClusterState,
*newState.getDerivedClusterState(elem.first),
- _distributorComponent.getBucketIdFactory(),
_distributorComponent.getIndex(),
newDistribution,
_distributorComponent.getDistributor().getStorageNodeUpStates());
bucketDb.forEach(proc);
- for (const auto & entry :proc.getBucketsToRemove()) {
- bucketDb.remove(entry);
+ for (const auto & bucket : proc.getBucketsToRemove()) {
+ bucketDb.remove(bucket);
+ }
+ // TODO vec of Entry instead to avoid lookup and remove? Uses more transient memory...
+ for (const auto& bucket : proc.getNonOwnedBuckets()) {
+ if (move_to_read_only_db) {
+ auto db_entry = bucketDb.get(bucket);
+ readOnlyDb.update(db_entry); // TODO Entry move support
+ }
+ bucketDb.remove(bucket);
}
+
}
}
@@ -154,6 +172,14 @@ BucketDBUpdater::completeTransitionTimer()
}
void
+BucketDBUpdater::clearReadOnlyBucketRepoDatabases()
+{
+ for (auto& space : _distributorComponent.getReadOnlyBucketSpaceRepo()) {
+ space.second->getBucketDatabase().clear();
+ }
+}
+
+void
BucketDBUpdater::storageDistributionChanged()
{
ensureTransitionTimerStarted();
@@ -169,6 +195,7 @@ BucketDBUpdater::storageDistributionChanged()
std::move(clusterInfo),
_sender,
_distributorComponent.getBucketSpaceRepo(),
+ _distributorComponent.getReadOnlyBucketSpaceRepo(),
_distributorComponent.getUniqueTimestamp());
_outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
}
@@ -176,14 +203,22 @@ BucketDBUpdater::storageDistributionChanged()
void
BucketDBUpdater::replyToPreviousPendingClusterStateIfAny()
{
- if (_pendingClusterState.get() &&
- _pendingClusterState->getCommand().get())
- {
+ if (_pendingClusterState.get() && _pendingClusterState->hasCommand()) {
_distributorComponent.sendUp(
std::make_shared<api::SetSystemStateReply>(*_pendingClusterState->getCommand()));
}
}
+void
+BucketDBUpdater::replyToActivationWithActualVersion(
+ const api::ActivateClusterStateVersionCommand& cmd,
+ uint32_t actualVersion)
+{
+ auto reply = std::make_shared<api::ActivateClusterStateVersionReply>(cmd);
+ reply->setActualVersion(actualVersion);
+ _distributorComponent.sendUp(reply); // TODO let API accept rvalues
+}
+
bool
BucketDBUpdater::onSetSystemState(
const std::shared_ptr<api::SetSystemStateCommand>& cmd)
@@ -214,6 +249,7 @@ BucketDBUpdater::onSetSystemState(
std::move(clusterInfo),
_sender,
_distributorComponent.getBucketSpaceRepo(),
+ _distributorComponent.getReadOnlyBucketSpaceRepo(),
cmd,
_outdatedNodesMap,
_distributorComponent.getUniqueTimestamp());
@@ -225,6 +261,39 @@ BucketDBUpdater::onSetSystemState(
return true;
}
+bool
+BucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd)
+{
+ if (hasPendingClusterState() && _pendingClusterState->isVersionedTransition()) {
+ const auto pending_version = _pendingClusterState->clusterStateVersion();
+ if (pending_version == cmd->version()) {
+ if (isPendingClusterStateCompleted()) {
+ assert(_pendingClusterState->isDeferred());
+ activatePendingClusterState();
+ } else {
+ LOG(error, "Received cluster state activation for pending version %u "
+ "without pending state being complete yet. This is not expected, "
+ "as no activation should be sent before all distributors have "
+ "reported that state processing is complete.", pending_version);
+ replyToActivationWithActualVersion(*cmd, 0); // Invalid version, will cause re-send (hopefully when completed).
+ return true;
+ }
+ } else {
+ replyToActivationWithActualVersion(*cmd, pending_version);
+ return true;
+ }
+ } else if (shouldDeferStateEnabling()) {
+ // Likely just a resend, but log warn for now to get a feel of how common it is.
+ LOG(warning, "Received cluster state activation command for version %u, which "
+ "has no corresponding pending state. Likely resent operation.", cmd->version());
+ } else {
+ LOG(debug, "Received cluster state activation command for version %u, but distributor "
+ "config does not have deferred activation enabled. Treating as no-op.", cmd->version());
+ }
+ // Fall through to next link in call chain that cares about this message.
+ return false;
+}
+
BucketDBUpdater::MergeReplyGuard::~MergeReplyGuard()
{
if (_reply) {
@@ -485,14 +554,45 @@ BucketDBUpdater::isPendingClusterStateCompleted() const
void
BucketDBUpdater::processCompletedPendingClusterState()
{
+ if (_pendingClusterState->isDeferred()) {
+ LOG(debug, "Deferring completion of pending cluster state version %u until explicitly activated",
+ _pendingClusterState->clusterStateVersion());
+ assert(_pendingClusterState->hasCommand()); // Deferred transitions should only ever be created by state commands.
+ // Sending down SetSystemState command will reach the state manager and a reply
+ // will be auto-sent back to the cluster controller in charge. Once this happens,
+ // it will send an explicit activation command once all distributors have reported
+ // that their pending cluster states have completed.
+ // A booting distributor will treat itself as "system Up" before the state has actually
+ // taken effect via activation. External operation handler will keep operations from
+ // actually being scheduled until state has been activated. The external operation handler
+ // needs to be explicitly aware of the case where no state has yet to be activated.
+ _distributorComponent.getDistributor().getMessageSender().sendDown(
+ _pendingClusterState->getCommand());
+ _pendingClusterState->clearCommand();
+ return;
+ }
+ // Distribution config change or non-deferred cluster state. Immediately activate
+ // the pending state without being told to do so explicitly.
+ activatePendingClusterState();
+}
+
+void
+BucketDBUpdater::activatePendingClusterState()
+{
_pendingClusterState->mergeIntoBucketDatabases();
- if (_pendingClusterState->getCommand().get()) {
+ if (_pendingClusterState->isVersionedTransition()) {
+ LOG(debug, "Activating pending cluster state version %u", _pendingClusterState->clusterStateVersion());
enableCurrentClusterStateBundleInDistributor();
- _distributorComponent.getDistributor().getMessageSender().sendDown(
- _pendingClusterState->getCommand());
+ if (_pendingClusterState->hasCommand()) {
+ _distributorComponent.getDistributor().getMessageSender().sendDown(
+ _pendingClusterState->getCommand());
+ }
addCurrentStateToClusterStateHistory();
} else {
+ LOG(debug, "Activating pending distribution config");
+ // TODO distribution changes cannot currently be deferred as they are not
+ // initiated by the cluster controller!
_distributorComponent.getDistributor().notifyDistributionChangeEnabled();
}
@@ -500,13 +600,14 @@ BucketDBUpdater::processCompletedPendingClusterState()
_outdatedNodesMap.clear();
sendAllQueuedBucketRechecks();
completeTransitionTimer();
+ clearReadOnlyBucketRepoDatabases();
}
void
BucketDBUpdater::enableCurrentClusterStateBundleInDistributor()
{
const lib::ClusterStateBundle& state(
- _pendingClusterState->getCommand()->getClusterStateBundle());
+ _pendingClusterState->getNewClusterStateBundle());
LOG(debug,
"BucketDBUpdater finished processing state %s",
@@ -688,7 +789,7 @@ BucketDBUpdater::NodeRemover::process(BucketDatabase::Entry& e)
return true;
}
if (!distributorOwnsBucket(bucketId)) {
- _removedBuckets.push_back(bucketId);
+ _nonOwnedBuckets.push_back(bucketId);
return true;
}
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index ea67e7ea72a..393e1e2524e 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -33,7 +33,8 @@ public:
using OutdatedNodes = dbtransition::OutdatedNodes;
using OutdatedNodesMap = dbtransition::OutdatedNodesMap;
BucketDBUpdater(Distributor& owner,
- DistributorBucketSpaceRepo &bucketSpaceRepo,
+ DistributorBucketSpaceRepo& bucketSpaceRepo,
+ DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
DistributorMessageSender& sender,
DistributorComponentRegister& compReg);
~BucketDBUpdater();
@@ -43,6 +44,7 @@ public:
void recheckBucketInfo(uint32_t nodeIdx, const document::Bucket& bucket);
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override;
+ bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) override;
bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply> & repl) override;
bool onMergeBucketReply(const std::shared_ptr<api::MergeBucketReply>& reply) override;
bool onNotifyBucketChange(const std::shared_ptr<api::NotifyBucketChangeCommand>&) override;
@@ -124,6 +126,7 @@ private:
}
};
+ bool shouldDeferStateEnabling() const noexcept;
bool hasPendingClusterState() const;
bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
bool processSingleBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
@@ -131,6 +134,7 @@ private:
const BucketRequest& req);
bool isPendingClusterStateCompleted() const;
void processCompletedPendingClusterState();
+ void activatePendingClusterState();
void mergeBucketInfoWithDatabase(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
const BucketRequest& req);
void convertBucketInfoToBucketList(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
@@ -141,6 +145,7 @@ private:
BucketListMerger::BucketList& existing) const;
void ensureTransitionTimerStarted();
void completeTransitionTimer();
+ void clearReadOnlyBucketRepoDatabases();
/**
* Adds all buckets contained in the bucket database
* that are either contained
@@ -161,6 +166,9 @@ private:
void removeSuperfluousBuckets(const lib::ClusterStateBundle& newState);
void replyToPreviousPendingClusterStateIfAny();
+ void replyToActivationWithActualVersion(
+ const api::ActivateClusterStateVersionCommand& cmd,
+ uint32_t actualVersion);
void enableCurrentClusterStateBundleInDistributor();
void addCurrentStateToClusterStateHistory();
@@ -191,30 +199,35 @@ private:
public:
NodeRemover(const lib::ClusterState& oldState,
const lib::ClusterState& s,
- [[maybe_unused]] const document::BucketIdFactory& factory,
uint16_t localIndex,
const lib::Distribution& distribution,
const char* upStates)
: _oldState(oldState),
_state(s),
+ _nonOwnedBuckets(),
+ _removedBuckets(),
_localIndex(localIndex),
_distribution(distribution),
_upStates(upStates) {}
- ~NodeRemover();
+ ~NodeRemover() override;
bool process(BucketDatabase::Entry& e) override;
void logRemove(const document::BucketId& bucketId, const char* msg) const;
bool distributorOwnsBucket(const document::BucketId&) const;
- const std::vector<document::BucketId>& getBucketsToRemove() const {
+ const std::vector<document::BucketId>& getBucketsToRemove() const noexcept {
return _removedBuckets;
}
+ const std::vector<document::BucketId>& getNonOwnedBuckets() const noexcept {
+ return _nonOwnedBuckets;
+ }
private:
void setCopiesInEntry(BucketDatabase::Entry& e, const std::vector<BucketCopy>& copies) const;
void removeEmptyBucket(const document::BucketId& bucketId);
const lib::ClusterState _oldState;
const lib::ClusterState _state;
+ std::vector<document::BucketId> _nonOwnedBuckets;
std::vector<document::BucketId> _removedBuckets;
uint16_t _localIndex;
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 1664dd0d9a1..c92dfbdc14e 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -67,15 +67,16 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_compReg(compReg),
_component(compReg, "distributor"),
_bucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>()),
+ _readOnlyBucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>()),
_metrics(new DistributorMetricSet(_component.getLoadTypes()->getMetricLoadTypes())),
_operationOwner(*this, _component.getClock()),
_maintenanceOperationOwner(*this, _component.getClock()),
_pendingMessageTracker(compReg),
- _bucketDBUpdater(*this, *_bucketSpaceRepo, *this, compReg),
+ _bucketDBUpdater(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, *this, compReg),
_distributorStatusDelegate(compReg, *this, *this),
_bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater),
- _idealStateManager(*this, *_bucketSpaceRepo, compReg, manageActiveBucketCopies),
- _externalOperationHandler(*this, *_bucketSpaceRepo, _idealStateManager, compReg),
+ _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, manageActiveBucketCopies),
+ _externalOperationHandler(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, _idealStateManager, compReg),
_threadPool(threadPool),
_initializingIsUp(true),
_doneInitializeHandler(doneInitHandler),
@@ -575,16 +576,20 @@ void
Distributor::propagateDefaultDistribution(
std::shared_ptr<const lib::Distribution> distribution)
{
- _bucketSpaceRepo->get(document::FixedBucketSpaces::default_space()).setDistribution(distribution);
auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution);
- _bucketSpaceRepo->get(document::FixedBucketSpaces::global_space()).setDistribution(std::move(global_distr));
+ for (auto* repo : {_bucketSpaceRepo.get(), _readOnlyBucketSpaceRepo.get()}) {
+ repo->get(document::FixedBucketSpaces::default_space()).setDistribution(distribution);
+ repo->get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr);
+ }
}
void
Distributor::propagateClusterStates()
{
- for (auto &iter : *_bucketSpaceRepo) {
- iter.second->setClusterState(_clusterStateBundle.getDerivedClusterState(iter.first));
+ for (auto* repo : {_bucketSpaceRepo.get(), _readOnlyBucketSpaceRepo.get()}) {
+ for (auto& iter : *repo) {
+ iter.second->setClusterState(_clusterStateBundle.getDerivedClusterState(iter.first));
+ }
}
}
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index fb8a9fb4299..cd24b91eba2 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -158,6 +158,13 @@ public:
DistributorBucketSpaceRepo &getBucketSpaceRepo() noexcept { return *_bucketSpaceRepo; }
const DistributorBucketSpaceRepo &getBucketSpaceRepo() const noexcept { return *_bucketSpaceRepo; }
+ DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept {
+ return *_readOnlyBucketSpaceRepo;
+ }
+ const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept {
+ return *_readOnlyBucketSpaceRepo;
+ }
+
private:
friend class Distributor_Test;
friend class BucketDBUpdaterTest;
@@ -244,6 +251,10 @@ private:
DistributorComponentRegister& _compReg;
storage::DistributorComponent _component;
std::unique_ptr<DistributorBucketSpaceRepo> _bucketSpaceRepo;
+ // Read-only bucket space repo with DBs that only contain buckets transiently
+ // during cluster state transitions. Bucket set does not overlap that of _bucketSpaceRepo
+ // and the DBs are empty during non-transition phases.
+ std::unique_ptr<DistributorBucketSpaceRepo> _readOnlyBucketSpaceRepo;
std::shared_ptr<DistributorMetricSet> _metrics;
OperationOwner _operationOwner;
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.cpp b/storage/src/vespa/storage/distributor/distributorcomponent.cpp
index d3d07350d35..9bd215b9644 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.cpp
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.cpp
@@ -15,16 +15,18 @@ namespace storage::distributor {
DistributorComponent::DistributorComponent(
DistributorInterface& distributor,
- DistributorBucketSpaceRepo &bucketSpaceRepo,
+ DistributorBucketSpaceRepo& bucketSpaceRepo,
+ DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
DistributorComponentRegister& compReg,
const std::string& name)
: storage::DistributorComponent(compReg, name),
_distributor(distributor),
- _bucketSpaceRepo(bucketSpaceRepo)
+ _bucketSpaceRepo(bucketSpaceRepo),
+ _readOnlyBucketSpaceRepo(readOnlyBucketSpaceRepo)
{
}
-DistributorComponent::~DistributorComponent() {}
+DistributorComponent::~DistributorComponent() = default;
void
DistributorComponent::sendDown(const api::StorageMessage::SP& msg)
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h
index 561904cee8d..f2aea89d47c 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.h
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.h
@@ -29,11 +29,12 @@ class DistributorComponent : public storage::DistributorComponent
{
public:
DistributorComponent(DistributorInterface& distributor,
- DistributorBucketSpaceRepo &bucketSpaceRepo,
- DistributorComponentRegister& compReg,
- const std::string& name);
+ DistributorBucketSpaceRepo& bucketSpaceRepo,
+ DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
+ DistributorComponentRegister& compReg,
+ const std::string& name);
- ~DistributorComponent();
+ ~DistributorComponent() override;
/**
* Returns the ownership status of a bucket as decided with the given
@@ -153,6 +154,9 @@ public:
DistributorBucketSpaceRepo &getBucketSpaceRepo() { return _bucketSpaceRepo; }
const DistributorBucketSpaceRepo &getBucketSpaceRepo() const { return _bucketSpaceRepo; }
+ DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() { return _readOnlyBucketSpaceRepo; }
+ const DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() const { return _readOnlyBucketSpaceRepo; }
+
/**
* Finds a bucket that has the same direct parent as the given bucket
* (i.e. split one bit less), but different bit in the most used bit.
@@ -179,7 +183,8 @@ private:
protected:
- DistributorBucketSpaceRepo &_bucketSpaceRepo;
+ DistributorBucketSpaceRepo& _bucketSpaceRepo;
+ DistributorBucketSpaceRepo& _readOnlyBucketSpaceRepo;
vespalib::Lock _sync;
};
diff --git a/storage/src/vespa/storage/distributor/distributormetricsset.cpp b/storage/src/vespa/storage/distributor/distributormetricsset.cpp
index 927dc06182d..83923a1f00e 100644
--- a/storage/src/vespa/storage/distributor/distributormetricsset.cpp
+++ b/storage/src/vespa/storage/distributor/distributormetricsset.cpp
@@ -17,7 +17,7 @@ DistributorMetricSet::DistributorMetricSet(const metrics::LoadTypeSet& lt)
removelocations(lt, PersistenceOperationMetricSet("removelocations"), this),
gets(lt, PersistenceOperationMetricSet("gets"), this),
stats(lt, PersistenceOperationMetricSet("stats"), this),
- multioperations(lt, PersistenceOperationMetricSet("multioperations"), this),
+ getbucketlists(lt, PersistenceOperationMetricSet("getbucketlists"), this),
visits(lt, VisitorMetricSet(), this),
stateTransitionTime("state_transition_time", {},
"Time it takes to complete a cluster state transition. If a "
diff --git a/storage/src/vespa/storage/distributor/distributormetricsset.h b/storage/src/vespa/storage/distributor/distributormetricsset.h
index 5a64027f500..dfe976a89ab 100644
--- a/storage/src/vespa/storage/distributor/distributormetricsset.h
+++ b/storage/src/vespa/storage/distributor/distributormetricsset.h
@@ -20,7 +20,7 @@ public:
metrics::LoadMetric<PersistenceOperationMetricSet> removelocations;
metrics::LoadMetric<PersistenceOperationMetricSet> gets;
metrics::LoadMetric<PersistenceOperationMetricSet> stats;
- metrics::LoadMetric<PersistenceOperationMetricSet> multioperations;
+ metrics::LoadMetric<PersistenceOperationMetricSet> getbucketlists;
metrics::LoadMetric<VisitorMetricSet> visits;
metrics::DoubleAverageMetric stateTransitionTime;
metrics::DoubleAverageMetric recoveryModeTime;
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index b22592af327..1b88f02cac6 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -20,14 +20,18 @@
#include "distributor_bucket_space.h"
#include <vespa/log/log.h>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+
LOG_SETUP(".distributor.manager");
namespace storage::distributor {
-ExternalOperationHandler::ExternalOperationHandler(Distributor& owner, DistributorBucketSpaceRepo& bucketSpaceRepo,
+ExternalOperationHandler::ExternalOperationHandler(Distributor& owner,
+ DistributorBucketSpaceRepo& bucketSpaceRepo,
+ DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
const MaintenanceOperationGenerator& gen,
DistributorComponentRegister& compReg)
- : DistributorComponent(owner, bucketSpaceRepo, compReg, "External operation handler"),
+ : DistributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "External operation handler"),
_operationGenerator(gen),
_rejectFeedBeforeTimeReached() // At epoch
{ }
@@ -68,19 +72,69 @@ ExternalOperationHandler::checkSafeTimeReached(api::StorageCommand& cmd)
return true;
}
+void ExternalOperationHandler::bounce_with_result(api::StorageCommand& cmd, const api::ReturnCode& result) {
+ api::StorageReply::UP reply(cmd.makeReply());
+ reply->setResult(result);
+ sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
+}
+
+void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd) {
+ // Distributor ownership is equal across bucket spaces, so always send back default space state.
+ // This also helps client avoid getting confused by possibly observing different actual
+ // (derived) state strings for global/non-global document types for the same state version.
+ // Similarly, if we've yet to activate any version at all we send back BUSY instead
+ // of a suspiciously empty WrongDistributionReply.
+ // TOOD consider NOT_READY instead of BUSY once we're sure this won't cause any other issues.
+ const auto& cluster_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState();
+ if (cluster_state.getVersion() != 0) {
+ auto cluster_state_str = cluster_state.toString();
+ LOG(debug, "Got message with wrong distribution, sending back state '%s'", cluster_state_str.c_str());
+ bounce_with_result(cmd, api::ReturnCode(api::ReturnCode::WRONG_DISTRIBUTION, cluster_state_str));
+ } else { // Only valid for empty startup state
+ LOG(debug, "Got message with wrong distribution, but no cluster state activated yet. Sending back BUSY");
+ bounce_with_result(cmd, api::ReturnCode(api::ReturnCode::BUSY, "No cluster state activated yet"));
+ }
+}
+
+void ExternalOperationHandler::bounce_with_busy_during_state_transition(
+ api::StorageCommand& cmd,
+ const lib::ClusterState& current_state,
+ const lib::ClusterState& pending_state)
+{
+ auto status_str = vespalib::make_string("Currently pending cluster state transition"
+ " from version %u to %u",
+ current_state.getVersion(), pending_state.getVersion());
+
+ api::StorageReply::UP reply(cmd.makeReply());
+ api::ReturnCode ret(api::ReturnCode::BUSY, status_str);
+ reply->setResult(ret);
+ sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
+}
+
bool
ExternalOperationHandler::checkTimestampMutationPreconditions(api::StorageCommand& cmd,
const document::BucketId &bucketId,
PersistenceOperationMetricSet& persistenceMetrics)
{
document::Bucket bucket(cmd.getBucket().getBucketSpace(), bucketId);
- if (!checkDistribution(cmd, bucket)) {
+ if (!ownsBucketInCurrentState(bucket)) {
LOG(debug, "Distributor manager received %s, bucket %s with wrong distribution",
cmd.toString().c_str(), bucket.toString().c_str());
-
+ bounce_with_wrong_distribution(cmd);
persistenceMetrics.failures.wrongdistributor.inc();
return false;
}
+
+ auto pending = getDistributor().checkOwnershipInPendingState(bucket);
+ if (!pending.isOwned()) {
+ // We return BUSY here instead of WrongDistributionReply to avoid clients potentially
+ // ping-ponging between cluster state versions during a state transition.
+ auto& current_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState();
+ auto& pending_state = pending.getNonOwnedState();
+ bounce_with_busy_during_state_transition(cmd, current_state, pending_state);
+ return false;
+ }
+
if (!checkSafeTimeReached(cmd)) {
persistenceMetrics.failures.safe_time_not_reached.inc();
return false;
@@ -111,6 +165,35 @@ bool ExternalOperationHandler::allowMutation(const SequencingHandle& handle) con
return handle.valid();
}
+template <typename Func>
+void ExternalOperationHandler::bounce_or_invoke_read_only_op(
+ api::StorageCommand& cmd,
+ const document::Bucket& bucket,
+ PersistenceOperationMetricSet& metrics,
+ Func func)
+{
+ if (!ownsBucketInCurrentState(bucket)) {
+ LOG(debug, "Distributor manager received %s, bucket %s with wrong distribution",
+ cmd.toString().c_str(), bucket.toString().c_str());
+ bounce_with_wrong_distribution(cmd);
+ metrics.failures.wrongdistributor.inc();
+ return;
+ }
+
+ auto pending = getDistributor().checkOwnershipInPendingState(bucket);
+ if (pending.isOwned()) {
+ func(_bucketSpaceRepo);
+ } else {
+ if (getDistributor().getConfig().allowStaleReadsDuringClusterStateTransitions()) {
+ func(_readOnlyBucketSpaceRepo);
+ } else {
+ auto& current_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState();
+ auto& pending_state = pending.getNonOwnedState();
+ bounce_with_busy_during_state_transition(cmd, current_state, pending_state);
+ }
+ }
+}
+
IMPL_MSG_COMMAND_H(ExternalOperationHandler, Put)
{
auto& metrics = getMetrics().puts[cmd->getLoadType()];
@@ -186,10 +269,8 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation)
RemoveLocationOperation::getBucketId(*this, *cmd, bid);
document::Bucket bucket(cmd->getBucket().getBucketSpace(), bid);
- if (!checkDistribution(*cmd, bucket)) {
- LOG(debug, "Distributor manager received %s with wrong distribution", cmd->toString().c_str());
-
- getMetrics().removelocations[cmd->getLoadType()].failures.wrongdistributor.inc();
+ auto& metrics = getMetrics().removelocations[cmd->getLoadType()];
+ if (!checkTimestampMutationPreconditions(*cmd, bucket.getBucketId(), metrics)) {
return true;
}
@@ -201,43 +282,38 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation)
IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get)
{
document::Bucket bucket(cmd->getBucket().getBucketSpace(), getBucketId(cmd->getDocumentId()));
- if (!checkDistribution(*cmd, bucket)) {
- LOG(debug, "Distributor manager received get for %s, bucket %s with wrong distribution",
- cmd->getDocumentId().toString().c_str(), bucket.toString().c_str());
-
- getMetrics().gets[cmd->getLoadType()].failures.wrongdistributor.inc();
- return true;
- }
-
- _op = std::make_shared<GetOperation>(*this, _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()),
- cmd, getMetrics().gets[cmd->getLoadType()]);
+ auto& metrics = getMetrics().gets[cmd->getLoadType()];
+ bounce_or_invoke_read_only_op(*cmd, bucket, metrics, [&](auto& bucket_space_repo) {
+ _op = std::make_shared<GetOperation>(*this, bucket_space_repo.get(cmd->getBucket().getBucketSpace()),
+ cmd, metrics);
+ });
return true;
}
IMPL_MSG_COMMAND_H(ExternalOperationHandler, StatBucket)
{
- if (!checkDistribution(*cmd, cmd->getBucket())) {
- return true;
- }
- auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()));
- _op = std::make_shared<StatBucketOperation>(*this, distributorBucketSpace, cmd);
+ auto& metrics = getMetrics().stats[cmd->getLoadType()];
+ bounce_or_invoke_read_only_op(*cmd, cmd->getBucket(), metrics, [&](auto& bucket_space_repo) {
+ auto& bucket_space = bucket_space_repo.get(cmd->getBucket().getBucketSpace());
+ _op = std::make_shared<StatBucketOperation>(*this, bucket_space, cmd);
+ });
return true;
}
IMPL_MSG_COMMAND_H(ExternalOperationHandler, GetBucketList)
{
- if (!checkDistribution(*cmd, cmd->getBucket())) {
- return true;
- }
- auto bucketSpace(cmd->getBucket().getBucketSpace());
- auto &distributorBucketSpace(_bucketSpaceRepo.get(bucketSpace));
- auto &bucketDatabase(distributorBucketSpace.getBucketDatabase());
- _op = std::make_shared<StatBucketListOperation>(bucketDatabase, _operationGenerator, getIndex(), cmd);
+ auto& metrics = getMetrics().getbucketlists[cmd->getLoadType()];
+ bounce_or_invoke_read_only_op(*cmd, cmd->getBucket(), metrics, [&](auto& bucket_space_repo) {
+ auto& bucket_space = bucket_space_repo.get(cmd->getBucket().getBucketSpace());
+ auto& bucket_database = bucket_space.getBucketDatabase();
+ _op = std::make_shared<StatBucketListOperation>(bucket_database, _operationGenerator, getIndex(), cmd);
+ });
return true;
}
IMPL_MSG_COMMAND_H(ExternalOperationHandler, CreateVisitor)
{
+ // TODO same handling as Gets (VisitorOperation needs to change)
const DistributorConfiguration& config(getDistributor().getConfig());
VisitorOperation::Config visitorConfig(config.getMinBucketsPerVisitor(), config.getMaxVisitorsPerNodePerClientVisitor());
auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()));
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h
index c198fe30159..655feb5d00c 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.h
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h
@@ -37,10 +37,11 @@ public:
ExternalOperationHandler(Distributor& owner,
DistributorBucketSpaceRepo& bucketSpaceRepo,
+ DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
const MaintenanceOperationGenerator&,
DistributorComponentRegister& compReg);
- ~ExternalOperationHandler();
+ ~ExternalOperationHandler() override;
bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg,
Operation::SP& operation);
@@ -55,6 +56,18 @@ private:
Operation::SP _op;
TimePoint _rejectFeedBeforeTimeReached;
+ template <typename Func>
+ void bounce_or_invoke_read_only_op(api::StorageCommand& cmd,
+ const document::Bucket& bucket,
+ PersistenceOperationMetricSet& metrics,
+ Func f);
+
+ void bounce_with_wrong_distribution(api::StorageCommand& cmd);
+ void bounce_with_busy_during_state_transition(api::StorageCommand& cmd,
+ const lib::ClusterState& current_state,
+ const lib::ClusterState& pending_state);
+ void bounce_with_result(api::StorageCommand& cmd, const api::ReturnCode& result);
+
bool checkSafeTimeReached(api::StorageCommand& cmd);
api::ReturnCode makeSafeTimeRejectionResult(TimePoint unsafeTime);
bool checkTimestampMutationPreconditions(
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.cpp b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
index 77b924ad351..5a1ff31e2e7 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.cpp
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
@@ -26,11 +26,12 @@ namespace distributor {
IdealStateManager::IdealStateManager(
Distributor& owner,
DistributorBucketSpaceRepo& bucketSpaceRepo,
+ DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
DistributorComponentRegister& compReg,
bool manageActiveBucketCopies)
: HtmlStatusReporter("idealstateman", "Ideal state manager"),
_metrics(new IdealStateMetricSet),
- _distributorComponent(owner, bucketSpaceRepo, compReg, "Ideal state manager"),
+ _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Ideal state manager"),
_bucketSpaceRepo(bucketSpaceRepo)
{
_distributorComponent.registerStatusPage(*this);
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.h b/storage/src/vespa/storage/distributor/idealstatemanager.h
index c8be2a40ad7..3bb6d0dd757 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.h
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.h
@@ -37,6 +37,7 @@ public:
IdealStateManager(Distributor& owner,
DistributorBucketSpaceRepo& bucketSpaceRepo,
+ DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
DistributorComponentRegister& compReg,
bool manageActiveBucketCopies);
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
index 198c588dfd1..3936f13077e 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
@@ -34,6 +34,9 @@ public:
bool hasConsistentCopies() const;
+ // Exposed for unit testing. TODO feels a bit dirty :I
+ const DistributorBucketSpace& bucketSpace() const noexcept { return _bucketSpace; }
+
private:
class GroupId {
public:
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index 5f74a82c28a..6cba7084037 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -27,7 +27,8 @@ PendingClusterState::PendingClusterState(
const framework::Clock& clock,
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
- DistributorBucketSpaceRepo &bucketSpaceRepo,
+ DistributorBucketSpaceRepo& bucketSpaceRepo,
+ DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
const OutdatedNodesMap &outdatedNodesMap,
api::Timestamp creationTimestamp)
@@ -40,6 +41,9 @@ PendingClusterState::PendingClusterState(
_creationTimestamp(creationTimestamp),
_sender(sender),
_bucketSpaceRepo(bucketSpaceRepo),
+ _readOnlyBucketSpaceRepo(readOnlyBucketSpaceRepo),
+ _clusterStateVersion(_cmd->getClusterStateBundle().getVersion()),
+ _isVersionedTransition(true),
_bucketOwnershipTransfer(false),
_pendingTransitions()
{
@@ -51,7 +55,8 @@ PendingClusterState::PendingClusterState(
const framework::Clock& clock,
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
- DistributorBucketSpaceRepo &bucketSpaceRepo,
+ DistributorBucketSpaceRepo& bucketSpaceRepo,
+ DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
api::Timestamp creationTimestamp)
: _requestedNodes(clusterInfo->getStorageNodeCount()),
_prevClusterStateBundle(clusterInfo->getClusterStateBundle()),
@@ -61,6 +66,9 @@ PendingClusterState::PendingClusterState(
_creationTimestamp(creationTimestamp),
_sender(sender),
_bucketSpaceRepo(bucketSpaceRepo),
+ _readOnlyBucketSpaceRepo(readOnlyBucketSpaceRepo),
+ _clusterStateVersion(0),
+ _isVersionedTransition(false),
_bucketOwnershipTransfer(true),
_pendingTransitions()
{
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h
index b96ba8cbbd7..cedc0573381 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.h
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h
@@ -45,15 +45,16 @@ public:
const framework::Clock& clock,
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
- DistributorBucketSpaceRepo &bucketSpaceRepo,
+ DistributorBucketSpaceRepo& bucketSpaceRepo,
+ DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
const OutdatedNodesMap &outdatedNodesMap,
api::Timestamp creationTimestamp)
{
- return std::unique_ptr<PendingClusterState>(
- new PendingClusterState(clock, clusterInfo, sender, bucketSpaceRepo, newStateCmd,
- outdatedNodesMap,
- creationTimestamp));
+ // Naked new due to private constructor
+ return std::unique_ptr<PendingClusterState>(new PendingClusterState(
+ clock, clusterInfo, sender, bucketSpaceRepo, readOnlyBucketSpaceRepo,
+ newStateCmd, outdatedNodesMap, creationTimestamp));
}
/**
@@ -64,16 +65,19 @@ public:
const framework::Clock& clock,
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
- DistributorBucketSpaceRepo &bucketSpaceRepo,
+ DistributorBucketSpaceRepo& bucketSpaceRepo,
+ DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
api::Timestamp creationTimestamp)
{
- return std::unique_ptr<PendingClusterState>(
- new PendingClusterState(clock, clusterInfo, sender, bucketSpaceRepo, creationTimestamp));
+ // Naked new due to private constructor
+ return std::unique_ptr<PendingClusterState>(new PendingClusterState(
+ clock, clusterInfo, sender, bucketSpaceRepo,
+ readOnlyBucketSpaceRepo, creationTimestamp));
}
PendingClusterState(const PendingClusterState &) = delete;
PendingClusterState & operator = (const PendingClusterState &) = delete;
- ~PendingClusterState();
+ ~PendingClusterState() override;
/**
* Adds the info from the reply to our list of information.
@@ -104,10 +108,31 @@ public:
return _bucketOwnershipTransfer;
}
+ bool hasCommand() const noexcept {
+ return (_cmd.get() != nullptr);
+ }
+
std::shared_ptr<api::SetSystemStateCommand> getCommand() {
return _cmd;
}
+ bool isVersionedTransition() const noexcept {
+ return _isVersionedTransition;
+ }
+
+ uint32_t clusterStateVersion() const noexcept {
+ return _clusterStateVersion;
+ }
+
+ bool isDeferred() const noexcept {
+ return (isVersionedTransition()
+ && _newClusterStateBundle.deferredActivation());
+ }
+
+ void clearCommand() {
+ _cmd.reset();
+ }
+
const lib::ClusterStateBundle& getNewClusterStateBundle() const {
return _newClusterStateBundle;
}
@@ -141,7 +166,8 @@ private:
const framework::Clock&,
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
- DistributorBucketSpaceRepo &bucketSpaceRepo,
+ DistributorBucketSpaceRepo& bucketSpaceRepo,
+ DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
const OutdatedNodesMap &outdatedNodesMap,
api::Timestamp creationTimestamp);
@@ -154,7 +180,8 @@ private:
const framework::Clock&,
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
- DistributorBucketSpaceRepo &bucketSpaceRepo,
+ DistributorBucketSpaceRepo& bucketSpaceRepo,
+ DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
api::Timestamp creationTimestamp);
struct BucketSpaceAndNode {
@@ -204,8 +231,10 @@ private:
api::Timestamp _creationTimestamp;
DistributorMessageSender& _sender;
- DistributorBucketSpaceRepo &_bucketSpaceRepo;
-
+ DistributorBucketSpaceRepo& _bucketSpaceRepo;
+ DistributorBucketSpaceRepo& _readOnlyBucketSpaceRepo;
+ uint32_t _clusterStateVersion;
+ bool _isVersionedTransition;
bool _bucketOwnershipTransfer;
std::unordered_map<document::BucketSpace, std::unique_ptr<PendingBucketSpaceDbTransition>, document::BucketSpace::hash> _pendingTransitions;
};
diff --git a/storage/src/vespa/storage/storageserver/bouncer.cpp b/storage/src/vespa/storage/storageserver/bouncer.cpp
index 0541c7322f1..fdbfd553315 100644
--- a/storage/src/vespa/storage/storageserver/bouncer.cpp
+++ b/storage/src/vespa/storage/storageserver/bouncer.cpp
@@ -235,6 +235,7 @@ Bouncer::onDown(const std::shared_ptr<api::StorageMessage>& msg)
case api::MessageType::SETNODESTATE_ID:
case api::MessageType::GETNODESTATE_ID:
case api::MessageType::SETSYSTEMSTATE_ID:
+ case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID:
case api::MessageType::NOTIFYBUCKETCHANGE_ID:
// state commands are always ok
return false;
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 7fb85ef0ecc..978d434847e 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -622,20 +622,25 @@ CommunicationManager::sendDirectRPCReply(
{
std::string requestName(request.getMethodName());
if (requestName == "getnodestate3") {
- api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply));
+ auto& gns(dynamic_cast<api::GetNodeStateReply&>(*reply));
std::ostringstream ns;
serializeNodeState(gns, ns, true, true, false);
request.addReturnString(ns.str().c_str());
request.addReturnString(gns.getNodeInfo().c_str());
LOGBP(debug, "Sending getnodestate3 reply with host info '%s'.", gns.getNodeInfo().c_str());
} else if (requestName == "getnodestate2") {
- api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply));
+ auto& gns(dynamic_cast<api::GetNodeStateReply&>(*reply));
std::ostringstream ns;
serializeNodeState(gns, ns, true, true, false);
request.addReturnString(ns.str().c_str());
LOGBP(debug, "Sending getnodestate2 reply with no host info.");
} else if (requestName == "setsystemstate2" || requestName == "setdistributionstates") {
// No data to return
+ } else if (requestName == "activate_cluster_state_version") {
+ auto& activate_reply(dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply));
+ request.addReturnInt(activate_reply.actualVersion());
+ LOGBP(debug, "sending activate_cluster_state_version reply for version %u with actual version %u ",
+ activate_reply.activateVersion(), activate_reply.actualVersion());
} else {
request.addReturnInt(reply->getResult().getResult());
request.addReturnString(reply->getResult().getMessage().c_str());
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
index e31bded772c..ec488b25714 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
@@ -92,6 +92,11 @@ FNetListener::initRPC()
rb.ParamDesc("uncompressedSize", "Uncompressed size for payload");
rb.ParamDesc("payload", "Binary Slime format payload");
//-------------------------------------------------------------------------
+ rb.DefineMethod("activate_cluster_state_version", "i", "i", FRT_METHOD(FNetListener::RPC_activateClusterStateVersion), this);
+ rb.MethodDesc("Explicitly activates an already prepared cluster state version");
+ rb.ParamDesc("activate_version", "Expected cluster state version to activate");
+ rb.ReturnDesc("actual_version", "Cluster state version that was prepared on the node prior to receiving RPC");
+ //-------------------------------------------------------------------------
rb.DefineMethod("getcurrenttime", "", "lis", FRT_METHOD(FNetListener::RPC_getCurrentTime), this);
rb.MethodDesc("Get current time on this node");
rb.ReturnDesc("seconds", "Current time in seconds since epoch");
@@ -203,6 +208,7 @@ void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) {
req->SetError(RPCRequestWrapper::ERR_BAD_REQUEST, e.what());
return;
}
+ LOG(debug, "Got state bundle %s", state_bundle->toString().c_str());
// TODO add constructor taking in shared_ptr directly instead?
auto cmd = std::make_shared<api::SetSystemStateCommand>(*state_bundle);
@@ -211,4 +217,20 @@ void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) {
detach_and_forward_to_enqueuer(std::move(cmd), req);
}
+void FNetListener::RPC_activateClusterStateVersion(FRT_RPCRequest* req) {
+ if (_closed) {
+ LOG(debug, "Not handling RPC call activate_cluster_state_version() as we have closed");
+ req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down");
+ return;
+ }
+
+ const uint32_t activate_version = req->GetParams()->GetValue(0)._intval32;
+ auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(activate_version);
+ cmd->setPriority(api::StorageMessage::VERYHIGH);
+
+ LOG(debug, "Got state activation request for version %u", activate_version);
+
+ detach_and_forward_to_enqueuer(std::move(cmd), req);
+}
+
}
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.h b/storage/src/vespa/storage/storageserver/fnetlistener.h
index abcba18e0be..2097be15491 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.h
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.h
@@ -26,6 +26,7 @@ public:
void RPC_setSystemState2(FRT_RPCRequest *req);
void RPC_getCurrentTime(FRT_RPCRequest *req);
void RPC_setDistributionStates(FRT_RPCRequest* req);
+ void RPC_activateClusterStateVersion(FRT_RPCRequest* req);
void registerHandle(vespalib::stringref handle);
void close();
diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp
index 5b7e0ab4621..1f854bc724e 100644
--- a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp
+++ b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp
@@ -53,6 +53,9 @@ EncodedClusterStateBundle SlimeClusterStateBundleCodec::encode(
{
vespalib::Slime slime;
Cursor& root = slime.setObject();
+ if (bundle.deferredActivation()) {
+ root.setBool("deferred-activation", bundle.deferredActivation());
+ }
Cursor& states = root.setObject("states");
states.setString("baseline", serialize_state(*bundle.getBaselineClusterState()));
Cursor& spaces = states.setObject("spaces");
@@ -79,6 +82,7 @@ namespace {
static const Memory StatesField("states");
static const Memory BaselineField("baseline");
static const Memory SpacesField("spaces");
+static const Memory DeferredActivationField("deferred-activation");
struct StateInserter : vespalib::slime::ObjectTraverser {
lib::ClusterStateBundle::BucketSpaceStateMapping& _space_states;
@@ -118,8 +122,11 @@ std::shared_ptr<const lib::ClusterStateBundle> SlimeClusterStateBundleCodec::dec
lib::ClusterStateBundle::BucketSpaceStateMapping space_states;
StateInserter inserter(space_states);
spaces.traverse(inserter);
+
+ const bool deferred_activation = root[DeferredActivationField].asBool(); // Defaults to false if not set.
+
// TODO add shared_ptr constructor for baseline?
- return std::make_shared<lib::ClusterStateBundle>(baseline, std::move(space_states));
+ return std::make_shared<lib::ClusterStateBundle>(baseline, std::move(space_states), deferred_activation);
}
}
diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp
index 95cb5dec696..af01a880fea 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.cpp
+++ b/storage/src/vespa/storage/storageserver/statemanager.cpp
@@ -514,6 +514,19 @@ StateManager::onSetSystemState(
return true;
}
+bool
+StateManager::onActivateClusterStateVersion(
+ const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd)
+{
+ auto reply = std::make_shared<api::ActivateClusterStateVersionReply>(*cmd);
+ {
+ vespalib::LockGuard lock(_stateLock);
+ reply->setActualVersion(_systemState ? _systemState->getVersion() : 0);
+ }
+ sendUp(reply);
+ return true;
+}
+
void
StateManager::run(framework::ThreadHandle& thread)
{
diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h
index 0bacd41f6d9..57f0e02a136 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.h
+++ b/storage/src/vespa/storage/storageserver/statemanager.h
@@ -137,6 +137,7 @@ private:
bool onGetNodeState(const std::shared_ptr<api::GetNodeStateCommand>&) override;
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>&) override;
+ bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>&) override;
/**
* _stateLock MUST NOT be held while calling.