summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-08-22 15:00:25 +0200
committerGitHub <noreply@github.com>2022-08-22 15:00:25 +0200
commit9afa884388804986f5c22223ab13f1649da9bc68 (patch)
treeeb4d8d51ee1d8919c91cb83fa8cc9062fab91648 /storage
parent5a9875b516d43bfa367b5f3083f9bc58e7d8f795 (diff)
parent54693fc154c0fabae6ac82607765a22057977bbb (diff)
Merge pull request #23701 from vespa-engine/vekterli/two-phase-document-gc
Add support for two-phase document garbage collection [run-systemtest]
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/distributor_message_sender_stub.h5
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test.cpp20
-rw-r--r--storage/src/tests/distributor/garbagecollectiontest.cpp335
-rw-r--r--storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp7
-rw-r--r--storage/src/tests/persistence/processalltest.cpp94
-rw-r--r--storage/src/tests/storageapi/mbusprot/mbusprot.4.2.serialization.V_4_2_STABLE66
-rw-r--r--storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp50
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h9
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def8
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h4
-rw-r--r--storage/src/vespa/storage/distributor/distributormessagesender.h1
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp3
-rw-r--r--storage/src/vespa/storage/distributor/node_supported_features.h7
-rw-r--r--storage/src/vespa/storage/distributor/operationowner.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.h1
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp238
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h46
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h5
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp3
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp41
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto28
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto3
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp61
-rw-r--r--storage/src/vespa/storageapi/message/bucket.h3
-rw-r--r--storage/src/vespa/storageapi/message/removelocation.cpp8
-rw-r--r--storage/src/vespa/storageapi/message/removelocation.h32
29 files changed, 955 insertions, 135 deletions
diff --git a/storage/src/tests/distributor/distributor_message_sender_stub.h b/storage/src/tests/distributor/distributor_message_sender_stub.h
index 421fe2216ca..ba552217e07 100644
--- a/storage/src/tests/distributor/distributor_message_sender_stub.h
+++ b/storage/src/tests/distributor/distributor_message_sender_stub.h
@@ -106,6 +106,11 @@ public:
return *_operation_sequencer;
}
+ distributor::OperationSequencer& operation_sequencer() noexcept override {
+ assert(_operation_sequencer);
+ return *_operation_sequencer;
+ }
+
void set_operation_sequencer(distributor::OperationSequencer& op_seq) {
_operation_sequencer = &op_seq;
}
diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp
index 709f2e6cdc5..91b43ebce16 100644
--- a/storage/src/tests/distributor/distributor_stripe_test.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test.cpp
@@ -191,6 +191,12 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil {
configure_stripe(builder);
}
+ void configure_enable_two_phase_garbage_collection(bool use_two_phase) {
+ ConfigBuilder builder;
+ builder.enableTwoPhaseGarbageCollection = use_two_phase;
+ configure_stripe(builder);
+ }
+
bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept {
return _stripe->_scheduler->implicitly_clear_priority_on_schedule();
}
@@ -1012,4 +1018,18 @@ TEST_F(DistributorStripeTest, use_unordered_merge_chaining_config_is_propagated_
EXPECT_FALSE(getConfig().use_unordered_merge_chaining());
}
+TEST_F(DistributorStripeTest, enable_two_phase_gc_config_is_propagated_to_internal_config)
+{
+ setup_stripe(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ // Feature is currently disabled by default. TODO change once we roll it out.
+ EXPECT_FALSE(getConfig().enable_two_phase_garbage_collection());
+
+ configure_enable_two_phase_garbage_collection(true);
+ EXPECT_TRUE(getConfig().enable_two_phase_garbage_collection());
+
+ configure_enable_two_phase_garbage_collection(false);
+ EXPECT_FALSE(getConfig().enable_two_phase_garbage_collection());
+}
+
}
diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp
index 07ef4604e20..4f9fd25098b 100644
--- a/storage/src/tests/distributor/garbagecollectiontest.cpp
+++ b/storage/src/tests/distributor/garbagecollectiontest.cpp
@@ -11,33 +11,83 @@
#include <vespa/vespalib/gtest/gtest.h>
using document::test::makeDocumentBucket;
+using document::BucketId;
+using document::DocumentId;
+using document::FixedBucketSpaces;
using namespace ::testing;
namespace storage::distributor {
struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil {
+ BucketId _bucket_id;
+ OperationSequencer _operation_sequencer;
+ uint32_t _gc_start_time_sec;
+ spi::IdAndTimestamp _e1;
+ spi::IdAndTimestamp _e2;
+ spi::IdAndTimestamp _e3;
+ spi::IdAndTimestamp _e4;
+
+
+ GarbageCollectionOperationTest()
+ : _bucket_id(16, 1),
+ _operation_sequencer(),
+ _gc_start_time_sec(34),
+ _e1(DocumentId("id:foo:bar::doc-1"), spi::Timestamp(100)),
+ _e2(DocumentId("id:foo:bar::doc-2"), spi::Timestamp(200)),
+ _e3(DocumentId("id:foo:bar::doc-3"), spi::Timestamp(300)),
+ _e4(DocumentId("id:foo:bar::doc-4"), spi::Timestamp(400))
+ {}
+
void SetUp() override {
createLinks();
- enable_cluster_state("distributor:1 storage:2");
- addNodesToBucketDB(document::BucketId(16, 1), "0=250/50/300,1=250/50/300");
+ enable_cluster_state("version:10 distributor:1 storage:2");
+ addNodesToBucketDB(_bucket_id, "0=250/50/300,1=250/50/300");
auto cfg = make_config();
cfg->setGarbageCollection("music.date < 34", 3600s);
configure_stripe(cfg);
- getClock().setAbsoluteTimeInSeconds(34);
+ getClock().setAbsoluteTimeInSeconds(_gc_start_time_sec);
+ _sender.set_operation_sequencer(_operation_sequencer);
};
void TearDown() override {
close();
}
+ void enable_two_phase_gc() {
+ NodeSupportedFeatures with_two_phase;
+ with_two_phase.two_phase_remove_location = true;
+ set_node_supported_features(0, with_two_phase);
+ set_node_supported_features(1, with_two_phase);
+
+ config_enable_two_phase_gc(true);
+ }
+
+ void config_enable_two_phase_gc(bool enabled) {
+ auto config = make_config();
+ config->set_enable_two_phase_garbage_collection(enabled);
+ configure_stripe(std::move(config));
+ }
+
std::shared_ptr<GarbageCollectionOperation> create_op() {
auto op = std::make_shared<GarbageCollectionOperation>(
- dummy_cluster_context, BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
+ dummy_cluster_context, BucketAndNodes(makeDocumentBucket(_bucket_id),
toVector<uint16_t>(0, 1)));
op->setIdealStateManager(&getIdealStateManager());
return op;
}
+ static std::shared_ptr<api::RemoveLocationCommand> as_remove_location_command(const std::shared_ptr<api::StorageCommand>& cmd) {
+ auto msg = std::dynamic_pointer_cast<api::RemoveLocationCommand>(cmd);
+ assert(msg);
+ return msg;
+ }
+
+ static std::shared_ptr<api::RemoveLocationReply> make_remove_location_reply(api::StorageCommand& msg) {
+ auto reply = std::shared_ptr<api::StorageReply>(msg.makeReply());
+ assert(reply->getType() == api::MessageType::REMOVELOCATION_REPLY);
+ return std::dynamic_pointer_cast<api::RemoveLocationReply>(reply);
+ }
+
// FIXME fragile to assume that send order == node index, but that's the way it currently works
void reply_to_nth_request(GarbageCollectionOperation& op, size_t n,
uint32_t bucket_info_checksum, uint32_t n_docs_removed) {
@@ -51,8 +101,14 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil {
op.receive(_sender, reply);
}
+ void assert_bucket_last_gc_timestamp_is(uint32_t gc_time) {
+ BucketDatabase::Entry entry = getBucket(_bucket_id);
+ ASSERT_TRUE(entry.valid());
+ EXPECT_EQ(entry->getLastGarbageCollectionTime(), gc_time);
+ }
+
void assert_bucket_db_contains(std::vector<api::BucketInfo> info, uint32_t last_gc_time) {
- BucketDatabase::Entry entry = getBucket(document::BucketId(16, 1));
+ BucketDatabase::Entry entry = getBucket(_bucket_id);
ASSERT_TRUE(entry.valid());
ASSERT_EQ(entry->getNodeCount(), info.size());
EXPECT_EQ(entry->getLastGarbageCollectionTime(), last_gc_time);
@@ -69,11 +125,21 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil {
assert(gc_metrics);
return gc_metrics->documents_removed.getValue();
}
+
+ void assert_gc_op_completed_ok_without_second_phase(GarbageCollectionOperation& op) {
+ ASSERT_EQ(0u, _sender.commands().size());
+ EXPECT_TRUE(op.is_done());
+ EXPECT_TRUE(op.ok()); // It's not a failure to have nothing to do
+ // GC timestamp must be updated so we can move on to another bucket.
+ EXPECT_NO_FATAL_FAILURE(assert_bucket_last_gc_timestamp_is(_gc_start_time_sec));
+ EXPECT_EQ(0u, gc_removed_documents_metric()); // Nothing removed
+ }
};
-TEST_F(GarbageCollectionOperationTest, simple) {
+TEST_F(GarbageCollectionOperationTest, simple_legacy) {
auto op = create_op();
op->start(_sender, framework::MilliSecTime(0));
+ EXPECT_FALSE(op->is_two_phase());
ASSERT_EQ(2, _sender.commands().size());
EXPECT_EQ(0u, gc_removed_documents_metric());
@@ -119,4 +185,261 @@ TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_s
ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(7777, 100, 2000), api::BucketInfo(4567, 90, 500)}, 34));
}
+TEST_F(GarbageCollectionOperationTest, two_phase_gc_requires_config_enabling_and_explicit_node_support) {
+ NodeSupportedFeatures with_two_phase;
+ with_two_phase.two_phase_remove_location = true;
+ set_node_supported_features(1, with_two_phase);
+
+ config_enable_two_phase_gc(true);
+
+ // Config enabled, but only 1 node says it supports two-phase RemoveLocation
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ EXPECT_FALSE(op->is_two_phase());
+
+ // Node 0 suddenly upgraded...!
+ set_node_supported_features(0, with_two_phase);
+ op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ EXPECT_TRUE(op->is_two_phase());
+
+ // But doesn't matter if two-phase GC is config-disabled
+ config_enable_two_phase_gc(false);
+
+ op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ EXPECT_FALSE(op->is_two_phase());
+}
+
+TEST_F(GarbageCollectionOperationTest, first_phase_sends_enumerate_only_remove_locations_with_provided_gc_pri) {
+ enable_two_phase_gc();
+ auto op = create_op();
+ op->setPriority(getConfig().getMaintenancePriorities().garbageCollection);
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
+
+ for (int i : {0, 1}) {
+ auto cmd = as_remove_location_command(_sender.command(i));
+ EXPECT_TRUE(cmd->only_enumerate_docs());
+ EXPECT_EQ(cmd->getPriority(), getConfig().getMaintenancePriorities().garbageCollection);
+ }
+}
+
+TEST_F(GarbageCollectionOperationTest, second_phase_sends_intersection_of_returned_entries_with_feed_pri) {
+ enable_two_phase_gc();
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
+
+ auto r1 = make_remove_location_reply(*_sender.command(0));
+ r1->set_selection_matches({_e1, _e2, _e3});
+ auto r2 = make_remove_location_reply(*_sender.command(1));
+ r2->set_selection_matches({_e2, _e3, _e4}); // e2, e3 in common with r1
+
+ _sender.commands().clear();
+ op->receive(_sender, r1);
+ ASSERT_EQ(0u, _sender.commands().size()); // No phase 2 yet, must get reply from all nodes
+ op->receive(_sender, r2);
+ ASSERT_EQ(2u, _sender.commands().size()); // Phase 2 sent
+
+ std::vector<spi::IdAndTimestamp> expected({_e2, _e3});
+ for (int i : {0, 1}) {
+ auto cmd = as_remove_location_command(_sender.command(i));
+ EXPECT_FALSE(cmd->only_enumerate_docs());
+ EXPECT_EQ(cmd->explicit_remove_set(), expected);
+ EXPECT_EQ(cmd->getPriority(), getConfig().default_external_feed_priority());
+ }
+}
+
+TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_no_results) {
+ enable_two_phase_gc();
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
+
+ auto r1 = make_remove_location_reply(*_sender.command(0));
+ auto r2 = make_remove_location_reply(*_sender.command(1));
+ _sender.commands().clear();
+ // Empty result sets in both replies
+ op->receive(_sender, r1);
+ op->receive(_sender, r2);
+
+ EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op));
+}
+
+TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_results_but_intersection_is_empty) {
+ enable_two_phase_gc();
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
+
+ // No docs in common
+ auto r1 = make_remove_location_reply(*_sender.command(0));
+ r1->set_selection_matches({_e1});
+ auto r2 = make_remove_location_reply(*_sender.command(1));
+ r2->set_selection_matches({_e2});
+
+ _sender.commands().clear();
+ op->receive(_sender, r1);
+ op->receive(_sender, r2);
+
+ EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op));
+}
+
+// We explicitly test the case where the first reply has an empty result set since we internally
+// establish the baseline candidate set from the first reply. This test case leaks some internal
+// implementation details, but such is life.
+TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_intersection_empty_first_reply_is_empty_case) {
+ enable_two_phase_gc();
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
+
+ auto r1 = make_remove_location_reply(*_sender.command(0));
+ r1->set_selection_matches({});
+ auto r2 = make_remove_location_reply(*_sender.command(1));
+ r2->set_selection_matches({_e1, _e2, _e3, _e4});
+
+ _sender.commands().clear();
+ op->receive(_sender, r1);
+ op->receive(_sender, r2);
+
+ EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op));
+}
+
+
+TEST_F(GarbageCollectionOperationTest, db_metrics_and_timestamp_are_updated_on_second_phase_completion) {
+ enable_two_phase_gc();
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
+
+ auto r1 = make_remove_location_reply(*_sender.command(0));
+ r1->set_selection_matches({_e1, _e2, _e3});
+ auto r2 = make_remove_location_reply(*_sender.command(1));
+ r2->set_selection_matches({_e2, _e3, _e4}); // e2, e3 in common with r1
+
+ _sender.commands().clear();
+ op->receive(_sender, r1);
+ op->receive(_sender, r2);
+ ASSERT_EQ(2u, _sender.commands().size()); // Phase 2 sent
+
+ r1 = make_remove_location_reply(*_sender.command(0));
+ r1->set_documents_removed(3);
+ r1->setBucketInfo(api::BucketInfo(0x1234, 90, 500));
+
+ r2 = make_remove_location_reply(*_sender.command(1));
+ r2->set_documents_removed(3);
+ r2->setBucketInfo(api::BucketInfo(0x4567, 90, 500));
+
+ op->receive(_sender, r1);
+ op->receive(_sender, r2);
+
+ EXPECT_TRUE(op->ok());
+ EXPECT_TRUE(op->is_done());
+ EXPECT_EQ(3u, gc_removed_documents_metric());
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(0x1234, 90, 500),
+ api::BucketInfo(0x4567, 90, 500)},
+ _gc_start_time_sec));
+}
+
+struct GarbageCollectionOperationPhase1FailureTest : GarbageCollectionOperationTest {
+ std::shared_ptr<GarbageCollectionOperation> _op;
+ std::shared_ptr<api::RemoveLocationReply> _r1;
+ std::shared_ptr<api::RemoveLocationReply> _r2;
+
+ void SetUp() override {
+ GarbageCollectionOperationTest::SetUp();
+
+ enable_two_phase_gc();
+ _op = create_op();
+ _op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
+
+ _r1 = make_remove_location_reply(*_sender.command(0));
+ _r1->set_selection_matches({_e1});
+ _r2 = make_remove_location_reply(*_sender.command(1));
+ _r2->set_selection_matches({_e1});
+ }
+
+ void receive_phase1_replies() {
+ _sender.commands().clear();
+ _op->receive(_sender, _r1);
+ _op->receive(_sender, _r2);
+ }
+
+ void receive_phase1_replies_and_assert_no_phase_2_started() {
+ receive_phase1_replies();
+ ASSERT_EQ(0u, _sender.commands().size());
+ EXPECT_TRUE(_op->is_done());
+ EXPECT_FALSE(_op->ok());
+ // GC not completed, so timestamp/bucket DB are _not_ updated
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), // test init values
+ api::BucketInfo(250, 50, 300)},
+ 0/*GC start timestamp*/));
+ EXPECT_EQ(0u, gc_removed_documents_metric()); // Nothing removed
+ }
+};
+
+TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_failure_during_first_phase) {
+ _r2->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "oh no"));
+ receive_phase1_replies_and_assert_no_phase_2_started();
+}
+
+TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_cluster_state_version_changed_between_phases) {
+ enable_cluster_state("version:11 distributor:1 storage:2"); // version 10 -> 11
+ receive_phase1_replies_and_assert_no_phase_2_started();
+}
+
+TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_pending_cluster_state_between_phases) {
+ simulate_set_pending_cluster_state("version:11 distributor:1 storage:2"); // Pending; not enabled yet
+ receive_phase1_replies_and_assert_no_phase_2_started();
+}
+
+TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_bucket_inconsistently_split_between_phases) {
+ // Add a logical child of _bucket_id to the bucket tree. This implies an inconsistent split, as we never
+ // want to have a tree with buckets in inner node positions, only in leaves.
+ addNodesToBucketDB(BucketId(17, 1), "0=250/50/300,1=250/50/300");
+ receive_phase1_replies_and_assert_no_phase_2_started();
+}
+
+TEST_F(GarbageCollectionOperationTest, document_level_write_locks_are_checked_and_held_if_acquired) {
+ enable_two_phase_gc();
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
+
+ auto r1 = make_remove_location_reply(*_sender.command(0));
+ r1->set_selection_matches({_e1, _e2, _e3});
+ auto r2 = make_remove_location_reply(*_sender.command(1));
+ r2->set_selection_matches({_e1, _e2, _e3});
+
+ // Grab a lock on e2 to simulate a concurrent write to the document.
+ auto e2_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e2.id);
+ ASSERT_TRUE(e2_lock.valid());
+
+ _sender.commands().clear();
+ op->receive(_sender, r1);
+ op->receive(_sender, r2);
+
+ // Locks on e1 and e3 are held while GC removes are sent
+ auto e1_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e1.id);
+ EXPECT_FALSE(e1_lock.valid());
+ auto e3_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e3.id);
+ EXPECT_FALSE(e3_lock.valid());
+
+ std::vector<spi::IdAndTimestamp> expected({_e1, _e3}); // e2 not included in remove set
+ for (int i : {0, 1}) {
+ auto cmd = as_remove_location_command(_sender.command(i));
+ EXPECT_EQ(cmd->explicit_remove_set(), expected);
+ }
+
+ // Locks are implicitly released when the underlying operation is destroyed
+ op.reset();
+ e1_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e1.id);
+ EXPECT_TRUE(e1_lock.valid());
+ e3_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e3.id);
+ EXPECT_TRUE(e3_lock.valid());
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
index 014827cb03c..19ec51f4ed4 100644
--- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
@@ -2511,6 +2511,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, node_feature_sets_are_aggregated_from_nodes_
for (auto* s : stripes) {
for (uint16_t i : {0, 1, 2}) {
EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).unordered_merge_chaining);
+ EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).two_phase_remove_location);
}
}
@@ -2518,10 +2519,11 @@ TEST_F(TopLevelBucketDBUpdaterTest, node_feature_sets_are_aggregated_from_nodes_
for (uint32_t i = 0; i < _sender.commands().size(); i++) {
ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i),
dummy_buckets_to_return, [i](auto& reply) noexcept {
- // Pretend nodes 1 and 2 are on a shiny version with unordered merge chaining supported.
+ // Pretend nodes 1 and 2 are on a shiny version with support for new features.
// Node 0 does not support the fanciness.
if (i > 0) {
reply.supported_node_features().unordered_merge_chaining = true;
+ reply.supported_node_features().two_phase_remove_location = true;
}
}));
}
@@ -2529,8 +2531,11 @@ TEST_F(TopLevelBucketDBUpdaterTest, node_feature_sets_are_aggregated_from_nodes_
// Node features should be propagated to all stripes
for (auto* s : stripes) {
EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).unordered_merge_chaining);
+ EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).two_phase_remove_location);
EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).unordered_merge_chaining);
+ EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).two_phase_remove_location);
EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).unordered_merge_chaining);
+ EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).two_phase_remove_location);
}
}
diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp
index a02167bb08a..04ab5ad0cf4 100644
--- a/storage/src/tests/persistence/processalltest.cpp
+++ b/storage/src/tests/persistence/processalltest.cpp
@@ -10,6 +10,7 @@
#include <vespa/document/fieldvalue/intfieldvalue.h>
using document::test::makeDocumentBucket;
+using document::DocumentId;
using namespace ::testing;
namespace storage {
@@ -32,7 +33,7 @@ TEST_F(ProcessAllHandlerTest, change_of_repos_is_reflected) {
EXPECT_EQ(newDocRepo.get(), &getEnv().getDocumentTypeRepo());
}
-TEST_F(ProcessAllHandlerTest, remove_location) {
+TEST_F(ProcessAllHandlerTest, legacy_remove_location) {
document::BucketId bucketId(16, 4);
doPut(4, spi::Timestamp(1234));
doPut(4, spi::Timestamp(2345));
@@ -54,7 +55,7 @@ TEST_F(ProcessAllHandlerTest, remove_location) {
EXPECT_EQ(2u, reply->documents_removed());
}
-TEST_F(ProcessAllHandlerTest, remove_location_document_subset) {
+TEST_F(ProcessAllHandlerTest, legacy_remove_location_document_subset) {
document::BucketId bucketId(16, 4);
AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier, *_sequenceTaskExecutor, _bucketIdFactory);
@@ -89,6 +90,95 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) {
EXPECT_EQ(5u, reply->documents_removed());
}
+TEST_F(ProcessAllHandlerTest, remove_location_with_enumerate_only_returns_match_set_only) {
+ document::BucketId bucketId(16, 4);
+ AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier,
+ *_sequenceTaskExecutor, _bucketIdFactory);
+
+ document::TestDocMan docMan;
+ for (int i = 0; i < 10; ++i) {
+ document::Document::SP doc(docMan.createRandomDocumentAtLocation(4, 1234 + i));
+ doc->setValue(doc->getField("headerval"), document::IntFieldValue(i));
+ doPut(doc, bucketId, spi::Timestamp(100 + i));
+ }
+
+ document::Bucket bucket = makeDocumentBucket(bucketId);
+ auto cmd = std::make_shared<api::RemoveLocationCommand>("testdoctype1.headerval % 2 == 0", bucket);
+ cmd->set_only_enumerate_docs(true);
+ auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket));
+ // Enumeration is synchronous, so we get the reply in the _tracker_, not on the reply queue.
+ ASSERT_TRUE(tracker->hasReply());
+ auto* reply = dynamic_cast<api::RemoveLocationReply*>(&tracker->getReply());
+ ASSERT_TRUE(reply);
+ EXPECT_EQ(0u, reply->documents_removed());
+
+ // No docs should be removed (remove flag is all zero)
+ EXPECT_EQ("DocEntry(100, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n"
+ "DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n"
+ "DocEntry(102, 0, Doc(id:mail:testdoctype1:n=4:62608.html))\n"
+ "DocEntry(103, 0, Doc(id:mail:testdoctype1:n=4:26566.html))\n"
+ "DocEntry(104, 0, Doc(id:mail:testdoctype1:n=4:56061.html))\n"
+ "DocEntry(105, 0, Doc(id:mail:testdoctype1:n=4:20019.html))\n"
+ "DocEntry(106, 0, Doc(id:mail:testdoctype1:n=4:49514.html))\n"
+ "DocEntry(107, 0, Doc(id:mail:testdoctype1:n=4:13472.html))\n"
+ "DocEntry(108, 0, Doc(id:mail:testdoctype1:n=4:42967.html))\n"
+ "DocEntry(109, 0, Doc(id:mail:testdoctype1:n=4:6925.html))\n",
+ dumpBucket(bucketId));
+
+ std::vector<spi::IdAndTimestamp> expected = {
+ {DocumentId("id:mail:testdoctype1:n=4:3619.html"), spi::Timestamp(100)},
+ {DocumentId("id:mail:testdoctype1:n=4:62608.html"), spi::Timestamp(102)},
+ {DocumentId("id:mail:testdoctype1:n=4:56061.html"), spi::Timestamp(104)},
+ {DocumentId("id:mail:testdoctype1:n=4:49514.html"), spi::Timestamp(106)},
+ {DocumentId("id:mail:testdoctype1:n=4:42967.html"), spi::Timestamp(108)},
+ };
+ EXPECT_EQ(reply->selection_matches(), expected);
+}
+
+TEST_F(ProcessAllHandlerTest, remove_location_with_remove_set_only_removes_listed_docs) {
+ document::BucketId bucketId(16, 4);
+ AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier,
+ *_sequenceTaskExecutor, _bucketIdFactory);
+
+ document::TestDocMan docMan;
+ for (int i = 0; i < 10; ++i) {
+ document::Document::SP doc(docMan.createRandomDocumentAtLocation(4, 1234 + i));
+ doc->setValue(doc->getField("headerval"), document::IntFieldValue(i));
+ doPut(doc, bucketId, spi::Timestamp(100 + i));
+ }
+
+ document::Bucket bucket = makeDocumentBucket(bucketId);
+ // Use a selection that, if naively used, removes everything.
+ auto cmd = std::make_shared<api::RemoveLocationCommand>("true", bucket);
+ std::vector<spi::IdAndTimestamp> to_remove = {
+ {DocumentId("id:mail:testdoctype1:n=4:62608.html"), spi::Timestamp(102)},
+ {DocumentId("id:mail:testdoctype1:n=4:49514.html"), spi::Timestamp(106)},
+ {DocumentId("id:mail:testdoctype1:n=4:42967.html"), spi::Timestamp(108)},
+ };
+ cmd->set_explicit_remove_set(std::move(to_remove));
+ auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket));
+ // Actually removing the documents is asynchronous, so the response will be on the queue.
+ std::shared_ptr<api::StorageMessage> msg;
+ ASSERT_TRUE(_replySender.queue.getNext(msg, 60s));
+
+ // Remove flag toggled for the entries provided in the command
+ EXPECT_EQ("DocEntry(100, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n"
+ "DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n"
+ "DocEntry(102, 1, id:mail:testdoctype1:n=4:62608.html)\n"
+ "DocEntry(103, 0, Doc(id:mail:testdoctype1:n=4:26566.html))\n"
+ "DocEntry(104, 0, Doc(id:mail:testdoctype1:n=4:56061.html))\n"
+ "DocEntry(105, 0, Doc(id:mail:testdoctype1:n=4:20019.html))\n"
+ "DocEntry(106, 1, id:mail:testdoctype1:n=4:49514.html)\n"
+ "DocEntry(107, 0, Doc(id:mail:testdoctype1:n=4:13472.html))\n"
+ "DocEntry(108, 1, id:mail:testdoctype1:n=4:42967.html)\n"
+ "DocEntry(109, 0, Doc(id:mail:testdoctype1:n=4:6925.html))\n",
+ dumpBucket(bucketId));
+
+ auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(msg);
+ ASSERT_TRUE(reply);
+ EXPECT_EQ(3u, reply->documents_removed());
+}
+
TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_unknown_doc_type) {
document::BucketId bucketId(16, 4);
doPut(4, spi::Timestamp(1234));
diff --git a/storage/src/tests/storageapi/mbusprot/mbusprot.4.2.serialization.V_4_2_STABLE b/storage/src/tests/storageapi/mbusprot/mbusprot.4.2.serialization.V_4_2_STABLE
deleted file mode 100644
index 5045a98b037..00000000000
--- a/storage/src/tests/storageapi/mbusprot/mbusprot.4.2.serialization.V_4_2_STABLE
+++ /dev/null
@@ -1,66 +0,0 @@
-
-MessageType(10, Put)
-\00\00\00
-\00\00\00j\00\08\00\00\00ddoc:test:test\00\05testdoctype1\00\00\01\00\00\00=\00\01\05\00= ;This is the contents of the test document.
-It ain't much.
-\00@\00\00\00\00\00\00Q\00\00\00\00\00\00\00\0e\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(11, Put Reply, reply of Put)
-\00\00\00\0b\00\00\00\0ddoc:test:test@\00\00\00\00\00\00Q\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(82, Update)
-\00\00\00R\00\00\005\00\08doc:test:test\00\01testdoctype1\00\00\01\00\00\00\01\00\00\00\02\00\00\00\01\00\00\10\1b\01\00\00\00\11@\00\00\00\00\00\00Q\00\00\00\00\00\00\00\0e\00\00\00\00\00\00\00
-\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(83, Update Reply, reply of Update)
-\00\00\00S\00\00\00\0ddoc:test:test@\00\00\00\00\00\00Q\00\00\00\00\00\00\00\0e\00\00\00\00\00\00\00\08\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(4, Get)
-\00\00\00\04\00\00\00\0ddoc:test:test@\00\00\00\00\00\00Q\00\00\00\00\00\00\00{\01\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(5, Get Reply, reply of Get)
-\00\00\00\05\00\00\00j\00\08\00\00\00ddoc:test:test\00\05testdoctype1\00\00\01\00\00\00=\00\01\05\00= ;This is the contents of the test document.
-It ain't much.
-\00\00\00\00\00\00\00\00d\00\00\00\0ddoc:test:test\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(12, Remove)
-\00\00\00\0c\00\00\00\0ddoc:test:test@\00\00\00\00\00\00Q\00\00\00\00\00\00\00\9f\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(13, Remove Reply, reply of Remove)
-\00\00\00\0d\00\00\00\0ddoc:test:test@\00\00\00\00\00\00Q\00\00\00\00\00\00\000\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(14, Revert)
-\00\00\00\0e@\00\00\00\00\00\00Q\00\00\00\01\00\00\00\00\00\00\00;\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(15, Revert Reply, reply of Revert)
-\00\00\00\0f@\00\00\00\00\00\00Q\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(54, Request bucket info)
-\00\00\006\00\00\00\00\00\03\00\00\00\14distributor:3 .1.s:d\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(55, Request bucket info reply, reply of Request bucket info)
-\00\00\007\00\00\00\01\00\00\00\00\00\00\00\04\00\00\00+\00\00\00\18\00\00\00{\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(56, Notify bucket change)
-\00\00\008P\00\00\00\00\00\03\e8\00\00\00\02\00\00\00\03\00\00\00\04\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(57, Notify bucket change reply, reply of Notify bucket change)
-\00\00\009\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(26, Create bucket)
-\00\00\00\1a\00\00\00\00\00\00\02o\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(27, Create bucket reply, reply of Create bucket)
-\00\00\00\1b\00\00\00\00\00\00\02o\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(34, Delete bucket)
-\00\00\00"\00\00\00\00\00\00\02o\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(35, Delete bucket reply, reply of Delete bucket)
-\00\00\00#\00\00\00\00\00\00\02o\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(32, Merge bucket)
-\00\00\00 \00\00\00\00\00\00\02o\00\03\00\04\00\00\0d\01\00\1a\01\00\00\00\00\00\00\04\d2\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(33, Merge bucket reply, reply of Merge bucket)
-\00\00\00!\00\00\00\00\00\00\02o\00\03\00\04\00\00\0d\01\00\1a\01\00\00\00\00\00\00\04\d2\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\04\00\00\00
-\00\00\00d
-MessageType(50, GetBucketDiff)
-\00\00\002\00\00\00\00\00\00\02o\00\02\00\04\00\00\0d\00\00\00\00\00\00\00\04 \00\00\00\01\00\00\00\00\00\01\e2@\00\0c1234567890ab\00\00\00d\00\01\00\00\00\01\00\03\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(51, GetBucketDiff reply, reply of GetBucketDiff)
-\00\00\003\00\00\00\00\00\00\02o\00\02\00\04\00\00\0d\00\00\00\00\00\00\00\04 \00\00\00\01\00\00\00\00\00\01\e2@\00\0c1234567890ab\00\00\00d\00\01\00\00\00\01\00\03\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(52, ApplyBucketDiff)
-\00\00\004@\00\00\00\00\00\02o\00\02\00\04\00\00\0d\00\00\00\04\d2\00\00\00\01\00\00\00\00\00\00\00\00\00\0c\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(53, ApplyBucketDiff reply, reply of ApplyBucketDiff)
-\00\00\005@\00\00\00\00\00\02o\00\02\00\04\00\00\0d\00\00\00\04\d2\00\00\00\01\00\00\00\00\00\00\00\00\00\0c\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(66, SplitBucket)
-\00\00\00B@\00\00\00\00\00\00\00\14(\00\00\03\e8\00\00\00\05\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(67, SplitBucket reply, reply of SplitBucket)
-\00\00\00C@\00\00\00\00\00\00\00\00\00\00\02D\00\00\00\00\00\00\00\00\00\00d\00\00\03\e8\00\00'\10D\00\00\00\00\00\00\01\00\00\00e\00\00\03\e9\00\00'\11\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(18, Visitor Create)
-\00\00\00\12\00\00\00\07library\00\00\00\02id\00\00\00\0ddoc selection\00\00\00\01\00\00\00\0bcontroldest\00\00\00\08datadest\00\00\00\02\00\00\00\00\00\00\00{\00\00\00\00\00\00\01\c8\00\00\00\02@\00\00\00\00\00\00\01@\00\00\00\00\00\00\02\00\01\01\00\00\00d\00\00\00\03\00\00\00\0dinto darkness\00\00\00\09bind them\00\00\00\08one ring\00\00\00\10to rule them all\00\00\00\0bone ring to\00\00\00\0dfind them and\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(19, Visitor Create Reply, reply of Visitor Create)
-\00\00\00\13\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01
-It ain't much.
-\00\00P\00\00\f1\f1\f1\f1\f1\00\00\00\00\00\00\00\00\01\ff\ff
diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
index d4d22d6a36c..5ed2e0d96b4 100644
--- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
+++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
@@ -387,11 +387,8 @@ TEST_P(StorageProtocolTest, request_bucket_info) {
// separately until we can figure out if this is by design or not.
EXPECT_EQ(lastMod, entries[0]._info.getLastModified());
- if (GetParam().getMajor() >= 7) {
- EXPECT_TRUE(reply2->supported_node_features().unordered_merge_chaining);
- } else {
- EXPECT_FALSE(reply2->supported_node_features().unordered_merge_chaining);
- }
+ EXPECT_TRUE(reply2->supported_node_features().unordered_merge_chaining);
+ EXPECT_TRUE(reply2->supported_node_features().two_phase_remove_location);
}
}
@@ -530,16 +527,57 @@ TEST_P(StorageProtocolTest, destroy_visitor) {
auto reply2 = copyReply(reply);
}
-TEST_P(StorageProtocolTest, remove_location) {
+TEST_P(StorageProtocolTest, legacy_remove_location) {
+ auto cmd = std::make_shared<RemoveLocationCommand>("id.group == \"mygroup\"", _bucket);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ("id.group == \"mygroup\"", cmd2->getDocumentSelection());
+ EXPECT_EQ(_bucket, cmd2->getBucket());
+ EXPECT_TRUE(cmd2->explicit_remove_set().empty());
+ EXPECT_FALSE(cmd2->only_enumerate_docs());
+
+ uint32_t n_docs_removed = 12345;
+ auto reply = std::make_shared<RemoveLocationReply>(*cmd2, n_docs_removed);
+ auto reply2 = copyReply(reply);
+ EXPECT_EQ(n_docs_removed, reply2->documents_removed());
+ EXPECT_TRUE(reply2->selection_matches().empty());
+}
+
+TEST_P(StorageProtocolTest, phase_1_remove_location) {
+ auto cmd = std::make_shared<RemoveLocationCommand>("id.group == \"mygroup\"", _bucket);
+ cmd->set_only_enumerate_docs(true);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ("id.group == \"mygroup\"", cmd2->getDocumentSelection());
+ EXPECT_EQ(_bucket, cmd2->getBucket());
+ EXPECT_TRUE(cmd2->explicit_remove_set().empty());
+ EXPECT_TRUE(cmd2->only_enumerate_docs());
+
+ auto reply = std::make_shared<RemoveLocationReply>(*cmd2, 0);
+ std::vector<spi::IdAndTimestamp> docs;
+ docs.emplace_back(DocumentId("id:foo:bar::baz"), spi::Timestamp(12345));
+ docs.emplace_back(DocumentId("id:foo:bar::zoid"), spi::Timestamp(67890));
+ reply->set_selection_matches(docs);
+ auto reply2 = copyReply(reply);
+ EXPECT_EQ(0, reply2->documents_removed());
+ EXPECT_EQ(reply2->selection_matches(), docs);
+}
+
+TEST_P(StorageProtocolTest, phase_2_remove_location) {
auto cmd = std::make_shared<RemoveLocationCommand>("id.group == \"mygroup\"", _bucket);
+ std::vector<spi::IdAndTimestamp> docs;
+ docs.emplace_back(DocumentId("id:foo:bar::baz"), spi::Timestamp(12345));
+ docs.emplace_back(DocumentId("id:foo:bar::zoid"), spi::Timestamp(67890));
+ cmd->set_explicit_remove_set(docs);
auto cmd2 = copyCommand(cmd);
EXPECT_EQ("id.group == \"mygroup\"", cmd2->getDocumentSelection());
EXPECT_EQ(_bucket, cmd2->getBucket());
+ EXPECT_FALSE(cmd2->only_enumerate_docs());
+ EXPECT_EQ(cmd2->explicit_remove_set(), docs);
uint32_t n_docs_removed = 12345;
auto reply = std::make_shared<RemoveLocationReply>(*cmd2, n_docs_removed);
auto reply2 = copyReply(reply);
EXPECT_EQ(n_docs_removed, reply2->documents_removed());
+ EXPECT_TRUE(reply2->selection_matches().empty());
}
TEST_P(StorageProtocolTest, stat_bucket) {
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index b4c23725493..c83c5cdd245 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -52,6 +52,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_implicitly_clear_priority_on_schedule(false),
_use_unordered_merge_chaining(false),
_inhibit_default_merges_when_global_merges_pending(false),
+ _enable_two_phase_garbage_collection(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{
}
@@ -175,6 +176,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_implicitly_clear_priority_on_schedule = config.implicitlyClearBucketPriorityOnSchedule;
_use_unordered_merge_chaining = config.useUnorderedMergeChaining;
_inhibit_default_merges_when_global_merges_pending = config.inhibitDefaultMergesWhenGlobalMergesPending;
+ _enable_two_phase_garbage_collection = config.enableTwoPhaseGarbageCollection;
_minimumReplicaCountingMode = config.minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index b26f115827e..a18770ac69f 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -119,6 +119,8 @@ public:
const MaintenancePriorities& getMaintenancePriorities() const {
return _maintenancePriorities;
}
+
+ uint8_t default_external_feed_priority() const noexcept { return 120; }
/**
@see setSplitCount
@@ -279,6 +281,12 @@ public:
[[nodiscard]] bool inhibit_default_merges_when_global_merges_pending() const noexcept {
return _inhibit_default_merges_when_global_merges_pending;
}
+ void set_enable_two_phase_garbage_collection(bool enable) noexcept {
+ _enable_two_phase_garbage_collection = enable;
+ }
+ [[nodiscard]] bool enable_two_phase_garbage_collection() const noexcept {
+ return _enable_two_phase_garbage_collection;
+ }
uint32_t num_distributor_stripes() const noexcept { return _num_distributor_stripes; }
@@ -338,6 +346,7 @@ private:
bool _implicitly_clear_priority_on_schedule;
bool _use_unordered_merge_chaining;
bool _inhibit_default_merges_when_global_merges_pending;
+ bool _enable_two_phase_garbage_collection;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index 0a858ba37c3..e363f53a4ea 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -299,3 +299,11 @@ use_unordered_merge_chaining bool default=true
## one or more nodes is in maintenance mode in the default bucket space but marked up in
## the global bucket space.
inhibit_default_merges_when_global_merges_pending bool default=true
+
+## If true, garbage collection is performed in two phases (metadata gathering and deletion)
+## instead of just a single phase. Two-phase GC allows for ensuring the same set of documents
+## is deleted across all nodes and explicitly takes write locks on the distributor to prevent
+## concurrent feed ops to GC'd documents from potentially creating inconsistencies.
+## Two-phase GC is only used iff all replica content nodes support the feature AND it's enabled
+## by this config.
+enable_two_phase_garbage_collection bool default=false
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index 49b062d6cc1..d1366bc0285 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -93,6 +93,10 @@ public:
return *_operation_sequencer;
}
+ OperationSequencer& operation_sequencer() noexcept override {
+ return *_operation_sequencer;
+ }
+
const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const override;
/**
diff --git a/storage/src/vespa/storage/distributor/distributormessagesender.h b/storage/src/vespa/storage/distributor/distributormessagesender.h
index 198427078d6..ef0252661f3 100644
--- a/storage/src/vespa/storage/distributor/distributormessagesender.h
+++ b/storage/src/vespa/storage/distributor/distributormessagesender.h
@@ -29,6 +29,7 @@ public:
virtual PendingMessageTracker& getPendingMessageTracker() = 0;
virtual const PendingMessageTracker& getPendingMessageTracker() const = 0;
virtual const OperationSequencer& operation_sequencer() const noexcept = 0;
+ virtual OperationSequencer& operation_sequencer() noexcept = 0;
};
}
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index 9ce8d871fc3..0ada9da29c1 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -63,6 +63,9 @@ public:
const OperationSequencer& operation_sequencer() const noexcept override {
abort(); // Never called by the messages using this component.
}
+ OperationSequencer& operation_sequencer() noexcept override {
+ abort(); // Never called by the messages using this component.
+ }
};
ExternalOperationHandler::ExternalOperationHandler(DistributorNodeContext& node_ctx,
diff --git a/storage/src/vespa/storage/distributor/node_supported_features.h b/storage/src/vespa/storage/distributor/node_supported_features.h
index 647e063f93e..87b2a9aef8e 100644
--- a/storage/src/vespa/storage/distributor/node_supported_features.h
+++ b/storage/src/vespa/storage/distributor/node_supported_features.h
@@ -11,11 +11,10 @@ namespace storage::distributor {
* are initially expected to be unsupported.
*/
struct NodeSupportedFeatures {
- bool unordered_merge_chaining = false;
+ bool unordered_merge_chaining = false;
+ bool two_phase_remove_location = false;
- bool operator==(const NodeSupportedFeatures& rhs) const noexcept {
- return unordered_merge_chaining == rhs.unordered_merge_chaining;
- };
+ bool operator==(const NodeSupportedFeatures& rhs) const noexcept = default;
};
}
diff --git a/storage/src/vespa/storage/distributor/operationowner.h b/storage/src/vespa/storage/distributor/operationowner.h
index d4afdf2cce7..c43d9f05fc9 100644
--- a/storage/src/vespa/storage/distributor/operationowner.h
+++ b/storage/src/vespa/storage/distributor/operationowner.h
@@ -55,6 +55,10 @@ public:
return _sender.operation_sequencer();
}
+ OperationSequencer& operation_sequencer() noexcept override {
+ return _sender.operation_sequencer();
+ }
+
private:
OperationOwner& _owner;
DistributorStripeMessageSender& _sender;
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index e91df1da51e..74ff8371fbe 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -36,6 +36,8 @@ PutOperation::PutOperation(const DistributorNodeContext& node_ctx,
{
}
+PutOperation::~PutOperation() = default;
+
void
PutOperation::insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetList& copies, bool setOneActive,
const api::StorageCommand& originalCommand,
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
index 61e4678496f..7ec81d6570d 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
@@ -29,6 +29,7 @@ public:
std::shared_ptr<api::PutCommand> msg,
PersistenceOperationMetricSet& metric,
SequencingHandle sequencingHandle = SequencingHandle());
+ ~PutOperation() override;
void onStart(DistributorStripeMessageSender& sender) override;
const char* getName() const override { return "put"; };
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index af3b727e07b..7c04636dc50 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -97,6 +97,10 @@ struct IntermediateMessageSender : DistributorStripeMessageSender {
const OperationSequencer& operation_sequencer() const noexcept override {
return forward.operation_sequencer();
}
+
+ OperationSequencer& operation_sequencer() noexcept override {
+ return forward.operation_sequencer();
+ }
};
IntermediateMessageSender::IntermediateMessageSender(SentMessageMap& mm,
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
index 7f66d1effd5..a30be1293cd 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
@@ -5,44 +5,110 @@
#include <vespa/storage/distributor/idealstatemetricsset.h>
#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
+#include <vespa/storage/distributor/node_supported_features_repo.h>
#include <vespa/storageapi/message/removelocation.h>
+#include <vespa/vespalib/stllike/hash_set.hpp>
+#include <algorithm>
#include <vespa/log/log.h>
-LOG_SETUP(".distributor.operation.idealstate.remove");
+LOG_SETUP(".distributor.operation.idealstate.gc");
namespace storage::distributor {
GarbageCollectionOperation::GarbageCollectionOperation(const ClusterContext& cluster_ctx, const BucketAndNodes& nodes)
: IdealStateOperation(nodes),
_tracker(cluster_ctx),
+ _phase(Phase::NotStarted),
+ _cluster_state_version_at_phase1_start_time(0),
+ _phase1_replies_received(0),
+ _remove_candidate_set(),
_replica_info(),
- _max_documents_removed(0)
+ _max_documents_removed(0),
+ _is_done(false)
{}
GarbageCollectionOperation::~GarbageCollectionOperation() = default;
-void GarbageCollectionOperation::onStart(DistributorStripeMessageSender& sender) {
+const char* GarbageCollectionOperation::to_string(Phase phase) noexcept {
+ switch (phase) {
+ case Phase::NotStarted: return "NotStarted";
+ case Phase::LegacySinglePhase: return "LegacySinglePhase";
+ case Phase::ReadMetadataPhase: return "ReadMetadataPhase";
+ case Phase::WriteRemovesPhase: return "WriteRemovesPhase";
+ default: abort();
+ }
+}
+
+bool GarbageCollectionOperation::all_involved_nodes_support_two_phase_gc() const noexcept {
+ const auto& features_repo = _manager->operation_context().node_supported_features_repo();
+ for (uint16_t node : getNodes()) {
+ if (!features_repo.node_supported_features(node).two_phase_remove_location) {
+ return false;
+ }
+ }
+ return true;
+}
+
+std::vector<spi::IdAndTimestamp> GarbageCollectionOperation::compile_phase_two_send_set() const {
+ std::vector<spi::IdAndTimestamp> docs_to_remove(_remove_candidate_set.begin(), _remove_candidate_set.end());
+ // Use timestamp order to provide test determinism and allow for backend linear merging (if needed).
+ // Tie-break on GID upon collisions (which technically should never happen...!)
+ auto ts_then_gid_order = [](const spi::IdAndTimestamp& lhs, const spi::IdAndTimestamp& rhs) noexcept {
+ if (lhs.timestamp != rhs.timestamp) {
+ return (lhs.timestamp < rhs.timestamp);
+ }
+ return (lhs.id.getGlobalId() < rhs.id.getGlobalId());
+ };
+ std::sort(docs_to_remove.begin(), docs_to_remove.end(), ts_then_gid_order);
+ return docs_to_remove;
+}
+
+void GarbageCollectionOperation::send_current_phase_remove_locations(DistributorStripeMessageSender& sender) {
BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId());
std::vector<uint16_t> nodes = entry->getNodes();
+ auto docs_to_remove = compile_phase_two_send_set(); // Always empty unless in phase 2 of two-phase GC
- for (auto node : nodes) {
+ for (size_t i = 0; i < nodes.size(); ++i) {
auto command = std::make_shared<api::RemoveLocationCommand>(
_manager->operation_context().distributor_config().getGarbageCollectionSelection(),
getBucket());
-
- command->setPriority(_priority);
- _tracker.queueCommand(command, node);
+ if (_phase == Phase::ReadMetadataPhase) {
+ command->set_only_enumerate_docs(true);
+ } else if (_phase == Phase::WriteRemovesPhase) {
+ if (i < nodes.size() - 1) {
+ command->set_explicit_remove_set(docs_to_remove);
+ } else {
+ command->set_explicit_remove_set(std::move(docs_to_remove));
+ }
+ } // else: legacy command
+ command->setPriority((_phase != Phase::WriteRemovesPhase)
+ ? _priority
+ : _manager->operation_context().distributor_config().default_external_feed_priority());
+ _tracker.queueCommand(command, nodes[i]);
}
-
_tracker.flushQueue(sender);
+}
+void GarbageCollectionOperation::onStart(DistributorStripeMessageSender& sender) {
+ if (_manager->operation_context().distributor_config().enable_two_phase_garbage_collection() &&
+ all_involved_nodes_support_two_phase_gc())
+ {
+ _cluster_state_version_at_phase1_start_time = _bucketSpace->getClusterState().getVersion();
+ LOG(debug, "Starting first phase of two-phase GC for %s at cluster state version %u",
+ getBucket().toString().c_str(), _cluster_state_version_at_phase1_start_time);
+ transition_to(Phase::ReadMetadataPhase);
+ } else {
+ LOG(debug, "Starting legacy single-phase GC for %s", getBucket().toString().c_str());
+ transition_to(Phase::LegacySinglePhase);
+ }
+ send_current_phase_remove_locations(sender);
if (_tracker.finished()) {
done();
}
}
void
-GarbageCollectionOperation::onReceive(DistributorStripeMessageSender&,
+GarbageCollectionOperation::onReceive(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply>& reply)
{
auto* rep = dynamic_cast<api::RemoveLocationReply*>(reply.get());
@@ -51,33 +117,154 @@ GarbageCollectionOperation::onReceive(DistributorStripeMessageSender&,
uint16_t node = _tracker.handleReply(*rep);
if (!rep->getResult().failed()) {
- _replica_info.emplace_back(_manager->operation_context().generate_unique_timestamp(),
- node, rep->getBucketInfo());
- _max_documents_removed = std::max(_max_documents_removed, rep->documents_removed());
+ if (_phase == Phase::LegacySinglePhase) {
+ handle_ok_legacy_reply(node, *rep);
+ } else if (_phase == Phase::ReadMetadataPhase) {
+ handle_ok_phase1_reply(*rep);
+ } else {
+ assert(_phase == Phase::WriteRemovesPhase);
+ handle_ok_phase2_reply(node, *rep);
+ }
} else {
_ok = false;
}
if (_tracker.finished()) {
+ const bool op_complete = (!_ok || _phase == Phase::LegacySinglePhase || _phase == Phase::WriteRemovesPhase);
if (_ok) {
- merge_received_bucket_info_into_db();
+ if (op_complete) {
+ merge_received_bucket_info_into_db();
+ } else {
+ assert(_phase == Phase::ReadMetadataPhase);
+ on_metadata_read_phase_done(sender);
+ }
+ }
+ if (op_complete) {
+ mark_operation_complete();
}
- update_gc_metrics();
- done();
}
}
-void GarbageCollectionOperation::merge_received_bucket_info_into_db() {
- // TODO avoid two separate DB ops for this. Current API currently does not make this elegant.
- _manager->operation_context().update_bucket_database(getBucket(), _replica_info);
+void GarbageCollectionOperation::update_replica_response_info_from_reply(uint16_t from_node, const api::RemoveLocationReply& reply) {
+ _replica_info.emplace_back(_manager->operation_context().generate_unique_timestamp(),
+ from_node, reply.getBucketInfo());
+ _max_documents_removed = std::max(_max_documents_removed, reply.documents_removed());
+}
+
+void GarbageCollectionOperation::handle_ok_legacy_reply(uint16_t from_node, const api::RemoveLocationReply& reply) {
+ update_replica_response_info_from_reply(from_node, reply);
+}
+
+GarbageCollectionOperation::RemoveCandidateSet
+GarbageCollectionOperation::steal_selection_matches_as_set(api::RemoveLocationReply& reply) {
+ auto candidates = reply.steal_selection_matches();
+ RemoveCandidateSet set;
+ set.resize(candidates.size());
+ for (auto& cand : candidates) {
+ set.insert(std::move(cand));
+ }
+ return set;
+}
+
+void GarbageCollectionOperation::handle_ok_phase1_reply(api::RemoveLocationReply& reply) {
+ assert(reply.documents_removed() == 0);
+ if (_phase1_replies_received == 0) {
+ // Establish baseline candidate set. Since we require an intersection between all
+ // sets, the number of candidates can never be _greater_ than that of the first reply.
+ _remove_candidate_set = steal_selection_matches_as_set(reply);
+ } else if (!_remove_candidate_set.empty()) {
+ auto their_set = steal_selection_matches_as_set(reply);
+ std::vector<spi::IdAndTimestamp> to_remove;
+ for (auto& our_cand : _remove_candidate_set) {
+ if (!their_set.contains(our_cand)) {
+ to_remove.emplace_back(our_cand);
+ }
+ }
+ for (auto& rm_entry : to_remove) {
+ _remove_candidate_set.erase(rm_entry);
+ }
+ }
+ ++_phase1_replies_received;
+}
+
+void GarbageCollectionOperation::handle_ok_phase2_reply(uint16_t from_node, const api::RemoveLocationReply& reply) {
+ update_replica_response_info_from_reply(from_node, reply);
+}
+
+bool GarbageCollectionOperation::may_start_write_phase() const {
+ if (!_ok) {
+ return false; // Already broken, no reason to proceed.
+ }
+ const auto state_version_now = _bucketSpace->getClusterState().getVersion();
+ if ((state_version_now != _cluster_state_version_at_phase1_start_time) ||
+ _bucketSpace->has_pending_cluster_state())
+ {
+ LOG(debug, "GC(%s): not sending write phase; cluster state has changed, or a change is pending",
+ getBucket().toString().c_str());
+ return false;
+ }
+ // If bucket is gone, or has become inconsistently split, abort mission.
+ std::vector<BucketDatabase::Entry> entries;
+ _bucketSpace->getBucketDatabase().getAll(getBucketId(), entries);
+ if ((entries.size() != 1) || (entries[0].getBucketId() != getBucketId())) {
+ LOG(debug, "GC(%s): not sending write phase; bucket has become inconsistent",
+ getBucket().toString().c_str());
+ return false;
+ }
+ return true;
+}
+
+void GarbageCollectionOperation::on_metadata_read_phase_done(DistributorStripeMessageSender& sender) {
+ if (!may_start_write_phase()) {
+ _ok = false;
+ mark_operation_complete();
+ return;
+ }
+ std::vector<spi::IdAndTimestamp> already_pending_write;
+ for (auto& cand : _remove_candidate_set) {
+ auto maybe_seq_token = sender.operation_sequencer().try_acquire(getBucket().getBucketSpace(), cand.id);
+ if (maybe_seq_token.valid()) {
+ _gc_write_locks.emplace_back(std::move(maybe_seq_token));
+ LOG(spam, "GC(%s): acquired write lock for '%s'; adding to GC set",
+ getBucket().toString().c_str(), cand.id.toString().c_str());
+ } else {
+ already_pending_write.emplace_back(cand);
+ LOG(spam, "GC(%s): failed to acquire write lock for '%s'; not including in GC set",
+ getBucket().toString().c_str(), cand.id.toString().c_str());
+ }
+ }
+ for (auto& rm_entry : already_pending_write) {
+ _remove_candidate_set.erase(rm_entry);
+ }
+ if (_remove_candidate_set.empty()) {
+ update_last_gc_timestamp_in_db(); // Nothing to remove now, try again later.
+ mark_operation_complete();
+ return;
+ }
+ LOG(debug, "GC(%s): Sending phase 2 GC with %zu entries (with acquired write locks). "
+ "%zu documents had pending writes and could not be GCd at this time",
+ getBucket().toString().c_str(), _remove_candidate_set.size(), already_pending_write.size());
+ transition_to(Phase::WriteRemovesPhase);
+ send_current_phase_remove_locations(sender);
+}
+
+void GarbageCollectionOperation::update_last_gc_timestamp_in_db() {
BucketDatabase::Entry dbentry = _bucketSpace->getBucketDatabase().get(getBucketId());
if (dbentry.valid()) {
dbentry->setLastGarbageCollectionTime(
_manager->node_context().clock().getTimeInSeconds().getTime());
+ LOG(debug, "GC(%s): Tagging bucket completed at time %u",
+ getBucket().toString().c_str(), dbentry->getLastGarbageCollectionTime());
_bucketSpace->getBucketDatabase().update(dbentry);
}
}
+void GarbageCollectionOperation::merge_received_bucket_info_into_db() {
+ // TODO avoid two separate DB ops for this. Current API currently does not make this elegant.
+ _manager->operation_context().update_bucket_database(getBucket(), _replica_info);
+ update_last_gc_timestamp_in_db();
+}
+
void GarbageCollectionOperation::update_gc_metrics() {
auto metric_base = _manager->getMetrics().operations[IdealStateOperation::GARBAGE_COLLECTION];
auto gc_metrics = std::dynamic_pointer_cast<GcMetricSet>(metric_base);
@@ -85,6 +272,21 @@ void GarbageCollectionOperation::update_gc_metrics() {
gc_metrics->documents_removed.inc(_max_documents_removed);
}
+void GarbageCollectionOperation::mark_operation_complete() {
+ assert(!_is_done);
+ if (_ok) {
+ update_gc_metrics();
+ }
+ done();
+ _is_done = true;
+}
+
+void GarbageCollectionOperation::transition_to(Phase new_phase) {
+ LOG(spam, "GC(%s): state transition %s -> %s",
+ getBucket().toString().c_str(), to_string(_phase), to_string(new_phase));
+ _phase = new_phase;
+}
+
bool
GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const {
return true;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
index f51739242b7..adbbd210877 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
@@ -4,14 +4,16 @@
#include "idealstateoperation.h"
#include <vespa/storage/bucketdb/bucketcopy.h>
#include <vespa/storage/distributor/messagetracker.h>
+#include <vespa/storage/distributor/operation_sequencer.h>
+#include <vespa/persistence/spi/id_and_timestamp.h>
+#include <vespa/vespalib/stllike/hash_set.h>
#include <vector>
namespace storage::distributor {
class PendingMessageTracker;
-class GarbageCollectionOperation : public IdealStateOperation
-{
+class GarbageCollectionOperation final : public IdealStateOperation {
public:
GarbageCollectionOperation(const ClusterContext& cluster_ctx,
const BucketAndNodes& nodes);
@@ -22,15 +24,51 @@ public:
const char* getName() const override { return "garbagecollection"; };
Type getType() const override { return GARBAGE_COLLECTION; }
bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override;
+ bool is_two_phase() const noexcept {
+ return ((_phase == Phase::ReadMetadataPhase) || (_phase == Phase::WriteRemovesPhase));
+ }
+ bool is_done() const noexcept { return _is_done; }
protected:
MessageTracker _tracker;
private:
- std::vector<BucketCopy> _replica_info;
- uint32_t _max_documents_removed;
+ enum class Phase {
+ NotStarted,
+ LegacySinglePhase,
+ ReadMetadataPhase,
+ WriteRemovesPhase
+ };
+ static const char* to_string(Phase phase) noexcept;
+
+ using RemoveCandidateSet = vespalib::hash_set<spi::IdAndTimestamp, spi::IdAndTimestamp::hash>;
+
+ Phase _phase;
+ uint32_t _cluster_state_version_at_phase1_start_time;
+ uint32_t _phase1_replies_received;
+ RemoveCandidateSet _remove_candidate_set;
+ std::vector<SequencingHandle> _gc_write_locks;
+ std::vector<BucketCopy> _replica_info;
+ uint32_t _max_documents_removed;
+ bool _is_done;
+
+ static RemoveCandidateSet steal_selection_matches_as_set(api::RemoveLocationReply& reply);
+
+ void send_current_phase_remove_locations(DistributorStripeMessageSender& sender);
+ std::vector<spi::IdAndTimestamp> compile_phase_two_send_set() const;
+
+ void handle_ok_legacy_reply(uint16_t from_node, const api::RemoveLocationReply& reply);
+ void handle_ok_phase1_reply(api::RemoveLocationReply& reply);
+ void handle_ok_phase2_reply(uint16_t from_node, const api::RemoveLocationReply& reply);
+ void update_replica_response_info_from_reply(uint16_t from_node, const api::RemoveLocationReply& reply);
+ void on_metadata_read_phase_done(DistributorStripeMessageSender& sender);
+ [[nodiscard]] bool may_start_write_phase() const;
+ [[nodiscard]] bool all_involved_nodes_support_two_phase_gc() const noexcept;
+ void update_last_gc_timestamp_in_db();
void merge_received_bucket_info_into_db();
void update_gc_metrics();
+ void mark_operation_complete();
+ void transition_to(Phase new_phase);
};
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
index f8f35afe821..7e62506b77f 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
@@ -179,8 +179,9 @@ public:
/**
Set the priority we should send messages from this operation with.
*/
- void setPriority(api::StorageMessage::Priority priority)
- { _priority = priority; }
+ void setPriority(api::StorageMessage::Priority priority) noexcept {
+ _priority = priority;
+ }
/**
* Returns true if we are blocked to start this operation given
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index 30071d4d962..dadbda60021 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -340,7 +340,8 @@ PendingClusterState::update_node_supported_features_from_reply(uint16_t node, co
{
const auto& src_feat = reply.supported_node_features();
NodeSupportedFeatures dest_feat;
- dest_feat.unordered_merge_chaining = src_feat.unordered_merge_chaining;
+ dest_feat.unordered_merge_chaining = src_feat.unordered_merge_chaining;
+ dest_feat.two_phase_remove_location = src_feat.two_phase_remove_location;
// This will overwrite per bucket-space reply, but does not matter since it's independent of bucket space.
_node_features.insert(std::make_pair(node, dest_feat));
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index d5bf733a30c..cc1bd557298 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -406,19 +406,34 @@ AsyncHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrack
{
tracker->setMetric(_env._metrics.removeLocation);
- LOG(debug, "RemoveLocation(%s): using selection '%s'",
- cmd.getBucketId().toString().c_str(),
- cmd.getDocumentSelection().c_str());
-
spi::Bucket bucket(cmd.getBucket());
- UnrevertableRemoveEntryProcessor::DocumentIdsAndTimeStamps to_remove;
- UnrevertableRemoveEntryProcessor processor(to_remove);
-
- {
- auto usage = vespalib::CpuUsage::use(CpuUsage::Category::READ);
- BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(),
- std::make_shared<document::DocIdOnly>(),
- processor, spi::NEWEST_DOCUMENT_ONLY, tracker->context());
+ const bool is_legacy = (!cmd.only_enumerate_docs() && cmd.explicit_remove_set().empty());
+ std::vector<spi::IdAndTimestamp> to_remove;
+
+ LOG(debug, "RemoveLocation(%s): using selection '%s' (enumerate only: %s, remove set size: %zu)",
+ bucket.toString().c_str(),
+ cmd.getDocumentSelection().c_str(),
+ (cmd.only_enumerate_docs() ? "yes" : "no"),
+ cmd.explicit_remove_set().size());
+
+ if (is_legacy || cmd.only_enumerate_docs()) {
+ UnrevertableRemoveEntryProcessor processor(to_remove);
+ {
+ auto usage = vespalib::CpuUsage::use(CpuUsage::Category::READ);
+ BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(),
+ std::make_shared<document::DocIdOnly>(),
+ processor, spi::NEWEST_DOCUMENT_ONLY, tracker->context());
+ }
+ if (!is_legacy) {
+ LOG(debug, "RemoveLocation(%s): returning 1st phase results with %zu entries",
+ bucket.toString().c_str(), to_remove.size());
+ auto reply = std::make_shared<api::RemoveLocationReply>(cmd, 0); // No docs removed yet
+ reply->set_selection_matches(std::move(to_remove));
+ tracker->setReply(std::move(reply));
+ return tracker;
+ }
+ } else {
+ to_remove = cmd.steal_explicit_remove_set();
}
auto task = makeResultTask([&cmd, tracker = std::move(tracker), removed = to_remove.size()](spi::Result::UP response) {
@@ -427,6 +442,8 @@ AsyncHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrack
tracker->sendReply();
});
+ // In the case where a _newer_ mutation exists for a given entry in to_remove, it will be ignored
+ // (with no tombstone added) since we only preserve the newest operation for a document.
_spi.removeAsync(bucket, std::move(to_remove),
std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto
index cb923db3c3c..92c0fdc0b87 100644
--- a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto
+++ b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto
@@ -88,9 +88,26 @@ message RevertResponse {
BucketId remapped_bucket_id = 2;
}
+message IdAndTimestamp {
+ DocumentId id = 1;
+ uint64 timestamp = 2;
+}
+
+message PhaseOneRemove {
+ // Currently empty; its presence is enough
+}
+
+message PhaseTwoRemove {
+ repeated IdAndTimestamp explicit_remove_set = 1;
+}
+
message RemoveLocationRequest {
- Bucket bucket = 1;
- bytes document_selection = 2;
+ Bucket bucket = 1;
+ bytes document_selection = 2;
+ oneof phased_remove {
+ PhaseOneRemove phase_one = 3;
+ PhaseTwoRemove phase_two = 4;
+ }
}
message RemoveLocationStats {
@@ -98,7 +115,8 @@ message RemoveLocationStats {
}
message RemoveLocationResponse {
- BucketInfo bucket_info = 1;
- BucketId remapped_bucket_id = 2;
- RemoveLocationStats stats = 3;
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+ RemoveLocationStats stats = 3;
+ repeated IdAndTimestamp selection_matches = 4; // Iff reply to phase 1 remove
}
diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
index 7f7ab1d7c0b..a79ac9fd99a 100644
--- a/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
+++ b/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
@@ -110,7 +110,8 @@ message BucketAndBucketInfo {
}
message SupportedNodeFeatures {
- bool unordered_merge_chaining = 1;
+ bool unordered_merge_chaining = 1;
+ bool two_phase_remove_location = 2;
}
message RequestBucketInfoResponse {
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index dee766b4b2d..662408c4e95 100644
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -646,30 +646,83 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRevertReply(const SCmd& cm
// RemoveLocation
// -----------------------------------------------------------------
+namespace {
+
+void set_document_id(protobuf::DocumentId& dest, const document::DocumentId& src) {
+ *dest.mutable_id() = src.toString();
+}
+
+document::DocumentId get_document_id(const protobuf::DocumentId& src) {
+ return document::DocumentId(src.id()); // id() shall always be null terminated
+}
+
+void set_id_and_timestamp_vector(::google::protobuf::RepeatedPtrField<protobuf::IdAndTimestamp>& dest,
+ const std::vector<spi::IdAndTimestamp>& src)
+{
+ dest.Reserve(src.size());
+ for (const auto& src_entry : src) {
+ auto* dest_entry = dest.Add();
+ dest_entry->set_timestamp(src_entry.timestamp);
+ set_document_id(*dest_entry->mutable_id(), src_entry.id);
+ }
+}
+
+std::vector<spi::IdAndTimestamp>
+get_id_and_timestamp_vector(const ::google::protobuf::RepeatedPtrField<protobuf::IdAndTimestamp>& src)
+{
+ std::vector<spi::IdAndTimestamp> vec;
+ vec.reserve(src.size());
+ for (const auto& src_entry : src) {
+ vec.emplace_back(get_document_id(src_entry.id()), spi::Timestamp(src_entry.timestamp()));
+ }
+ return vec;
+}
+
+}
+
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationCommand& msg) const {
encode_bucket_request<protobuf::RemoveLocationRequest>(buf, msg, [&](auto& req) {
req.set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size());
+ if (msg.only_enumerate_docs()) {
+ req.mutable_phase_one(); // Instantiating it is enough
+ } else if (!msg.explicit_remove_set().empty()) {
+ set_id_and_timestamp_vector(*req.mutable_phase_two()->mutable_explicit_remove_set(),
+ msg.explicit_remove_set());
+ }
});
}
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationReply& msg) const {
encode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, msg, [&](auto& res) {
res.mutable_stats()->set_documents_removed(msg.documents_removed());
+ if (!msg.selection_matches().empty()) {
+ set_id_and_timestamp_vector(*res.mutable_selection_matches(), msg.selection_matches());
+ }
});
}
api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveLocationCommand(BBuf& buf) const {
return decode_bucket_request<protobuf::RemoveLocationRequest>(buf, [&](auto& req, auto& bucket) {
- return std::make_unique<api::RemoveLocationCommand>(req.document_selection(), bucket);
+ auto cmd = std::make_unique<api::RemoveLocationCommand>(req.document_selection(), bucket);
+ if (req.has_phase_one()) {
+ cmd->set_only_enumerate_docs(true);
+ } else if (req.has_phase_two()) {
+ cmd->set_explicit_remove_set(get_id_and_timestamp_vector(req.phase_two().explicit_remove_set()));
+ }
+ return cmd;
});
}
api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf) const {
return decode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, [&](auto& res) {
uint32_t documents_removed = (res.has_stats() ? res.stats().documents_removed() : 0u);
- return std::make_unique<api::RemoveLocationReply>(
+ auto reply = std::make_unique<api::RemoveLocationReply>(
static_cast<const api::RemoveLocationCommand&>(cmd),
documents_removed);
+ if (!res.selection_matches().empty()) {
+ reply->set_selection_matches(get_id_and_timestamp_vector(res.selection_matches()));
+ }
+ return reply;
});
}
@@ -1004,6 +1057,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoRe
// We mark features as available at protocol level. Only included for full bucket fetch responses.
if (msg.full_bucket_fetch()) {
res.mutable_supported_node_features()->set_unordered_merge_chaining(true);
+ res.mutable_supported_node_features()->set_two_phase_remove_location(true);
}
});
}
@@ -1044,7 +1098,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(con
if (res.has_supported_node_features()) {
const auto& src_features = res.supported_node_features();
auto& dest_features = reply->supported_node_features();
- dest_features.unordered_merge_chaining = src_features.unordered_merge_chaining();
+ dest_features.unordered_merge_chaining = src_features.unordered_merge_chaining();
+ dest_features.two_phase_remove_location = src_features.two_phase_remove_location();
}
return reply;
});
diff --git a/storage/src/vespa/storageapi/message/bucket.h b/storage/src/vespa/storageapi/message/bucket.h
index 47785a92039..d9843236d2e 100644
--- a/storage/src/vespa/storageapi/message/bucket.h
+++ b/storage/src/vespa/storageapi/message/bucket.h
@@ -393,7 +393,8 @@ public:
friend std::ostream& operator<<(std::ostream& os, const Entry&);
};
struct SupportedNodeFeatures {
- bool unordered_merge_chaining = false;
+ bool unordered_merge_chaining = false;
+ bool two_phase_remove_location = false;
};
using EntryVector = vespalib::Array<Entry>;
private:
diff --git a/storage/src/vespa/storageapi/message/removelocation.cpp b/storage/src/vespa/storageapi/message/removelocation.cpp
index 7b7ed894b2c..5d558c5e305 100644
--- a/storage/src/vespa/storageapi/message/removelocation.cpp
+++ b/storage/src/vespa/storageapi/message/removelocation.cpp
@@ -11,15 +11,17 @@ IMPLEMENT_REPLY(RemoveLocationReply)
RemoveLocationCommand::RemoveLocationCommand(vespalib::stringref documentSelection,
const document::Bucket &bucket)
: BucketInfoCommand(MessageType::REMOVELOCATION, bucket),
- _documentSelection(documentSelection)
+ _documentSelection(documentSelection),
+ _explicit_remove_set(),
+ _only_enumerate_docs(false)
{}
-RemoveLocationCommand::~RemoveLocationCommand() {}
+RemoveLocationCommand::~RemoveLocationCommand() = default;
void
RemoveLocationCommand::print(std::ostream& out, bool verbose, const std::string& indent) const
{
- if (_documentSelection.length()) {
+ if (!_documentSelection.empty()) {
out << "Remove selection(" << _documentSelection << "): ";
}
BucketInfoCommand::print(out, verbose, indent);
diff --git a/storage/src/vespa/storageapi/message/removelocation.h b/storage/src/vespa/storageapi/message/removelocation.h
index 276b090cc57..4e7c1d26711 100644
--- a/storage/src/vespa/storageapi/message/removelocation.h
+++ b/storage/src/vespa/storageapi/message/removelocation.h
@@ -2,6 +2,7 @@
#pragma once
#include <vespa/storageapi/defs.h>
+#include <vespa/persistence/spi/id_and_timestamp.h>
#include <vespa/storageapi/messageapi/storagecommand.h>
#include <vespa/storageapi/messageapi/bucketinforeply.h>
@@ -15,13 +16,34 @@ public:
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
const vespalib::string& getDocumentSelection() const { return _documentSelection; }
+ // TODO move to factory pattern instead to disallow creating illegal combinations
+ void set_only_enumerate_docs(bool only_enumerate) noexcept {
+ _only_enumerate_docs = only_enumerate;
+ }
+ [[nodiscard]] bool only_enumerate_docs() const noexcept {
+ return _only_enumerate_docs;
+ }
+ void set_explicit_remove_set(std::vector<spi::IdAndTimestamp> remove_set) {
+ _explicit_remove_set = std::move(remove_set);
+ }
+ const std::vector<spi::IdAndTimestamp>& explicit_remove_set() const noexcept {
+ return _explicit_remove_set;
+ }
+ std::vector<spi::IdAndTimestamp> steal_explicit_remove_set() const noexcept {
+ return std::move(_explicit_remove_set);
+ }
+
DECLARE_STORAGECOMMAND(RemoveLocationCommand, onRemoveLocation);
private:
+ // TODO make variant? Only one of the two may be used
vespalib::string _documentSelection;
+ std::vector<spi::IdAndTimestamp> _explicit_remove_set;
+ bool _only_enumerate_docs;
};
class RemoveLocationReply : public BucketInfoReply
{
+ std::vector<spi::IdAndTimestamp> _selection_matches; // For use in 1st phase GC
uint32_t _documents_removed;
public:
explicit RemoveLocationReply(const RemoveLocationCommand& cmd, uint32_t docs_removed = 0);
@@ -29,6 +51,16 @@ public:
_documents_removed = docs_removed;
}
uint32_t documents_removed() const noexcept { return _documents_removed; }
+ // TODO refactor
+ void set_selection_matches(std::vector<spi::IdAndTimestamp> matches) noexcept {
+ _selection_matches = std::move(matches);
+ }
+ const std::vector<spi::IdAndTimestamp>& selection_matches() const noexcept {
+ return _selection_matches;
+ }
+ std::vector<spi::IdAndTimestamp> steal_selection_matches() noexcept {
+ return std::move(_selection_matches);
+ }
DECLARE_STORAGEREPLY(RemoveLocationReply, onRemoveLocationReply)
};