aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-07-13 14:41:31 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-08-17 13:42:50 +0000
commit54693fc154c0fabae6ac82607765a22057977bbb (patch)
tree4522fe00d0f72311a1c5b99909ffcce7956fbd88
parentdddd2b3708358da2a855cbbef456c94c985cf08e (diff)
Add support for two-phase document garbage collection
If enabled, garbage collection is performed in two phases (metadata gathering and deletion) instead of just a single phase. Two-phase GC allows for ensuring the same set of documents is deleted across all nodes and explicitly takes write locks on the distributor to prevent concurrent feed ops to GC'd documents from potentially creating inconsistencies. Two-phase GC is only used _iff_ all replica content nodes support the feature _and_ it's enabled in config. An additional field has been added to the feature negotiation functionality to communicate support from content nodes to distributors.
-rw-r--r--persistence/src/vespa/persistence/spi/id_and_timestamp.cpp20
-rw-r--r--persistence/src/vespa/persistence/spi/id_and_timestamp.h10
-rw-r--r--storage/src/tests/distributor/distributor_message_sender_stub.h5
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test.cpp20
-rw-r--r--storage/src/tests/distributor/garbagecollectiontest.cpp335
-rw-r--r--storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp7
-rw-r--r--storage/src/tests/persistence/processalltest.cpp94
-rw-r--r--storage/src/tests/storageapi/mbusprot/mbusprot.4.2.serialization.V_4_2_STABLE66
-rw-r--r--storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp50
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h9
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def8
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h4
-rw-r--r--storage/src/vespa/storage/distributor/distributormessagesender.h1
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp3
-rw-r--r--storage/src/vespa/storage/distributor/node_supported_features.h7
-rw-r--r--storage/src/vespa/storage/distributor/operationowner.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.h1
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp238
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h46
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h5
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp3
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp41
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto28
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto3
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp61
-rw-r--r--storage/src/vespa/storageapi/message/bucket.h3
-rw-r--r--storage/src/vespa/storageapi/message/removelocation.cpp8
-rw-r--r--storage/src/vespa/storageapi/message/removelocation.h32
31 files changed, 985 insertions, 135 deletions
diff --git a/persistence/src/vespa/persistence/spi/id_and_timestamp.cpp b/persistence/src/vespa/persistence/spi/id_and_timestamp.cpp
index fba45990744..03eaf7c9e6e 100644
--- a/persistence/src/vespa/persistence/spi/id_and_timestamp.cpp
+++ b/persistence/src/vespa/persistence/spi/id_and_timestamp.cpp
@@ -1,5 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "id_and_timestamp.h"
+#include <vespa/vespalib/stllike/asciistream.h>
namespace storage::spi {
@@ -14,4 +15,23 @@ IdAndTimestamp& IdAndTimestamp::operator=(const IdAndTimestamp&) = default;
IdAndTimestamp::IdAndTimestamp(IdAndTimestamp&&) noexcept = default;
IdAndTimestamp& IdAndTimestamp::operator=(IdAndTimestamp&&) noexcept = default;
+void IdAndTimestamp::print(vespalib::asciistream& os) const {
+ os << id.toString() << " at time " << timestamp.getValue();
+}
+
+vespalib::string IdAndTimestamp::to_string() const {
+ vespalib::asciistream os;
+ print(os);
+ return os.str();
+}
+
+vespalib::asciistream& operator<<(vespalib::asciistream& os, const IdAndTimestamp& id_ts) {
+ id_ts.print(os);
+ return os;
+}
+std::ostream& operator<<(std::ostream& os, const IdAndTimestamp& id_ts) {
+ os << id_ts.to_string();
+ return os;
+}
+
}
diff --git a/persistence/src/vespa/persistence/spi/id_and_timestamp.h b/persistence/src/vespa/persistence/spi/id_and_timestamp.h
index d8cdba3d063..a45cbfcf5eb 100644
--- a/persistence/src/vespa/persistence/spi/id_and_timestamp.h
+++ b/persistence/src/vespa/persistence/spi/id_and_timestamp.h
@@ -3,6 +3,10 @@
#include "types.h"
#include <vespa/document/base/documentid.h>
+#include <vespa/vespalib/stllike/string.h>
+#include <iosfwd>
+
+namespace vespalib { class asciistream; }
namespace storage::spi {
@@ -27,6 +31,9 @@ struct IdAndTimestamp {
return ((id == rhs.id) && (timestamp == rhs.timestamp));
}
+ void print(vespalib::asciistream&) const;
+ vespalib::string to_string() const;
+
struct hash {
size_t operator()(const IdAndTimestamp& id_ts) const noexcept {
const size_t h = document::GlobalId::hash()(id_ts.id.getGlobalId());
@@ -35,4 +42,7 @@ struct IdAndTimestamp {
};
};
+vespalib::asciistream& operator<<(vespalib::asciistream&, const IdAndTimestamp&);
+std::ostream& operator<<(std::ostream&, const IdAndTimestamp&);
+
}
diff --git a/storage/src/tests/distributor/distributor_message_sender_stub.h b/storage/src/tests/distributor/distributor_message_sender_stub.h
index 421fe2216ca..ba552217e07 100644
--- a/storage/src/tests/distributor/distributor_message_sender_stub.h
+++ b/storage/src/tests/distributor/distributor_message_sender_stub.h
@@ -106,6 +106,11 @@ public:
return *_operation_sequencer;
}
+ distributor::OperationSequencer& operation_sequencer() noexcept override {
+ assert(_operation_sequencer);
+ return *_operation_sequencer;
+ }
+
void set_operation_sequencer(distributor::OperationSequencer& op_seq) {
_operation_sequencer = &op_seq;
}
diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp
index 709f2e6cdc5..91b43ebce16 100644
--- a/storage/src/tests/distributor/distributor_stripe_test.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test.cpp
@@ -191,6 +191,12 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil {
configure_stripe(builder);
}
+ void configure_enable_two_phase_garbage_collection(bool use_two_phase) {
+ ConfigBuilder builder;
+ builder.enableTwoPhaseGarbageCollection = use_two_phase;
+ configure_stripe(builder);
+ }
+
bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept {
return _stripe->_scheduler->implicitly_clear_priority_on_schedule();
}
@@ -1012,4 +1018,18 @@ TEST_F(DistributorStripeTest, use_unordered_merge_chaining_config_is_propagated_
EXPECT_FALSE(getConfig().use_unordered_merge_chaining());
}
+TEST_F(DistributorStripeTest, enable_two_phase_gc_config_is_propagated_to_internal_config)
+{
+ setup_stripe(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ // Feature is currently disabled by default. TODO change once we roll it out.
+ EXPECT_FALSE(getConfig().enable_two_phase_garbage_collection());
+
+ configure_enable_two_phase_garbage_collection(true);
+ EXPECT_TRUE(getConfig().enable_two_phase_garbage_collection());
+
+ configure_enable_two_phase_garbage_collection(false);
+ EXPECT_FALSE(getConfig().enable_two_phase_garbage_collection());
+}
+
}
diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp
index 07ef4604e20..4f9fd25098b 100644
--- a/storage/src/tests/distributor/garbagecollectiontest.cpp
+++ b/storage/src/tests/distributor/garbagecollectiontest.cpp
@@ -11,33 +11,83 @@
#include <vespa/vespalib/gtest/gtest.h>
using document::test::makeDocumentBucket;
+using document::BucketId;
+using document::DocumentId;
+using document::FixedBucketSpaces;
using namespace ::testing;
namespace storage::distributor {
struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil {
+ BucketId _bucket_id;
+ OperationSequencer _operation_sequencer;
+ uint32_t _gc_start_time_sec;
+ spi::IdAndTimestamp _e1;
+ spi::IdAndTimestamp _e2;
+ spi::IdAndTimestamp _e3;
+ spi::IdAndTimestamp _e4;
+
+
+ GarbageCollectionOperationTest()
+ : _bucket_id(16, 1),
+ _operation_sequencer(),
+ _gc_start_time_sec(34),
+ _e1(DocumentId("id:foo:bar::doc-1"), spi::Timestamp(100)),
+ _e2(DocumentId("id:foo:bar::doc-2"), spi::Timestamp(200)),
+ _e3(DocumentId("id:foo:bar::doc-3"), spi::Timestamp(300)),
+ _e4(DocumentId("id:foo:bar::doc-4"), spi::Timestamp(400))
+ {}
+
void SetUp() override {
createLinks();
- enable_cluster_state("distributor:1 storage:2");
- addNodesToBucketDB(document::BucketId(16, 1), "0=250/50/300,1=250/50/300");
+ enable_cluster_state("version:10 distributor:1 storage:2");
+ addNodesToBucketDB(_bucket_id, "0=250/50/300,1=250/50/300");
auto cfg = make_config();
cfg->setGarbageCollection("music.date < 34", 3600s);
configure_stripe(cfg);
- getClock().setAbsoluteTimeInSeconds(34);
+ getClock().setAbsoluteTimeInSeconds(_gc_start_time_sec);
+ _sender.set_operation_sequencer(_operation_sequencer);
};
void TearDown() override {
close();
}
+ void enable_two_phase_gc() {
+ NodeSupportedFeatures with_two_phase;
+ with_two_phase.two_phase_remove_location = true;
+ set_node_supported_features(0, with_two_phase);
+ set_node_supported_features(1, with_two_phase);
+
+ config_enable_two_phase_gc(true);
+ }
+
+ void config_enable_two_phase_gc(bool enabled) {
+ auto config = make_config();
+ config->set_enable_two_phase_garbage_collection(enabled);
+ configure_stripe(std::move(config));
+ }
+
std::shared_ptr<GarbageCollectionOperation> create_op() {
auto op = std::make_shared<GarbageCollectionOperation>(
- dummy_cluster_context, BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
+ dummy_cluster_context, BucketAndNodes(makeDocumentBucket(_bucket_id),
toVector<uint16_t>(0, 1)));
op->setIdealStateManager(&getIdealStateManager());
return op;
}
+ static std::shared_ptr<api::RemoveLocationCommand> as_remove_location_command(const std::shared_ptr<api::StorageCommand>& cmd) {
+ auto msg = std::dynamic_pointer_cast<api::RemoveLocationCommand>(cmd);
+ assert(msg);
+ return msg;
+ }
+
+ static std::shared_ptr<api::RemoveLocationReply> make_remove_location_reply(api::StorageCommand& msg) {
+ auto reply = std::shared_ptr<api::StorageReply>(msg.makeReply());
+ assert(reply->getType() == api::MessageType::REMOVELOCATION_REPLY);
+ return std::dynamic_pointer_cast<api::RemoveLocationReply>(reply);
+ }
+
// FIXME fragile to assume that send order == node index, but that's the way it currently works
void reply_to_nth_request(GarbageCollectionOperation& op, size_t n,
uint32_t bucket_info_checksum, uint32_t n_docs_removed) {
@@ -51,8 +101,14 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil {
op.receive(_sender, reply);
}
+ void assert_bucket_last_gc_timestamp_is(uint32_t gc_time) {
+ BucketDatabase::Entry entry = getBucket(_bucket_id);
+ ASSERT_TRUE(entry.valid());
+ EXPECT_EQ(entry->getLastGarbageCollectionTime(), gc_time);
+ }
+
void assert_bucket_db_contains(std::vector<api::BucketInfo> info, uint32_t last_gc_time) {
- BucketDatabase::Entry entry = getBucket(document::BucketId(16, 1));
+ BucketDatabase::Entry entry = getBucket(_bucket_id);
ASSERT_TRUE(entry.valid());
ASSERT_EQ(entry->getNodeCount(), info.size());
EXPECT_EQ(entry->getLastGarbageCollectionTime(), last_gc_time);
@@ -69,11 +125,21 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil {
assert(gc_metrics);
return gc_metrics->documents_removed.getValue();
}
+
+ void assert_gc_op_completed_ok_without_second_phase(GarbageCollectionOperation& op) {
+ ASSERT_EQ(0u, _sender.commands().size());
+ EXPECT_TRUE(op.is_done());
+ EXPECT_TRUE(op.ok()); // It's not a failure to have nothing to do
+ // GC timestamp must be updated so we can move on to another bucket.
+ EXPECT_NO_FATAL_FAILURE(assert_bucket_last_gc_timestamp_is(_gc_start_time_sec));
+ EXPECT_EQ(0u, gc_removed_documents_metric()); // Nothing removed
+ }
};
-TEST_F(GarbageCollectionOperationTest, simple) {
+TEST_F(GarbageCollectionOperationTest, simple_legacy) {
auto op = create_op();
op->start(_sender, framework::MilliSecTime(0));
+ EXPECT_FALSE(op->is_two_phase());
ASSERT_EQ(2, _sender.commands().size());
EXPECT_EQ(0u, gc_removed_documents_metric());
@@ -119,4 +185,261 @@ TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_s
ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(7777, 100, 2000), api::BucketInfo(4567, 90, 500)}, 34));
}
+TEST_F(GarbageCollectionOperationTest, two_phase_gc_requires_config_enabling_and_explicit_node_support) {
+ NodeSupportedFeatures with_two_phase;
+ with_two_phase.two_phase_remove_location = true;
+ set_node_supported_features(1, with_two_phase);
+
+ config_enable_two_phase_gc(true);
+
+ // Config enabled, but only 1 node says it supports two-phase RemoveLocation
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ EXPECT_FALSE(op->is_two_phase());
+
+ // Node 0 suddenly upgraded...!
+ set_node_supported_features(0, with_two_phase);
+ op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ EXPECT_TRUE(op->is_two_phase());
+
+ // But doesn't matter if two-phase GC is config-disabled
+ config_enable_two_phase_gc(false);
+
+ op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ EXPECT_FALSE(op->is_two_phase());
+}
+
+TEST_F(GarbageCollectionOperationTest, first_phase_sends_enumerate_only_remove_locations_with_provided_gc_pri) {
+ enable_two_phase_gc();
+ auto op = create_op();
+ op->setPriority(getConfig().getMaintenancePriorities().garbageCollection);
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
+
+ for (int i : {0, 1}) {
+ auto cmd = as_remove_location_command(_sender.command(i));
+ EXPECT_TRUE(cmd->only_enumerate_docs());
+ EXPECT_EQ(cmd->getPriority(), getConfig().getMaintenancePriorities().garbageCollection);
+ }
+}
+
+TEST_F(GarbageCollectionOperationTest, second_phase_sends_intersection_of_returned_entries_with_feed_pri) {
+ enable_two_phase_gc();
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
+
+ auto r1 = make_remove_location_reply(*_sender.command(0));
+ r1->set_selection_matches({_e1, _e2, _e3});
+ auto r2 = make_remove_location_reply(*_sender.command(1));
+ r2->set_selection_matches({_e2, _e3, _e4}); // e2, e3 in common with r1
+
+ _sender.commands().clear();
+ op->receive(_sender, r1);
+ ASSERT_EQ(0u, _sender.commands().size()); // No phase 2 yet, must get reply from all nodes
+ op->receive(_sender, r2);
+ ASSERT_EQ(2u, _sender.commands().size()); // Phase 2 sent
+
+ std::vector<spi::IdAndTimestamp> expected({_e2, _e3});
+ for (int i : {0, 1}) {
+ auto cmd = as_remove_location_command(_sender.command(i));
+ EXPECT_FALSE(cmd->only_enumerate_docs());
+ EXPECT_EQ(cmd->explicit_remove_set(), expected);
+ EXPECT_EQ(cmd->getPriority(), getConfig().default_external_feed_priority());
+ }
+}
+
+TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_no_results) {
+ enable_two_phase_gc();
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
+
+ auto r1 = make_remove_location_reply(*_sender.command(0));
+ auto r2 = make_remove_location_reply(*_sender.command(1));
+ _sender.commands().clear();
+ // Empty result sets in both replies
+ op->receive(_sender, r1);
+ op->receive(_sender, r2);
+
+ EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op));
+}
+
+TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_results_but_intersection_is_empty) {
+ enable_two_phase_gc();
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
+
+ // No docs in common
+ auto r1 = make_remove_location_reply(*_sender.command(0));
+ r1->set_selection_matches({_e1});
+ auto r2 = make_remove_location_reply(*_sender.command(1));
+ r2->set_selection_matches({_e2});
+
+ _sender.commands().clear();
+ op->receive(_sender, r1);
+ op->receive(_sender, r2);
+
+ EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op));
+}
+
+// We explicitly test the case where the first reply has an empty result set since we internally
+// establish the baseline candidate set from the first reply. This test case leaks some internal
+// implementation details, but such is life.
+TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_intersection_empty_first_reply_is_empty_case) {
+ enable_two_phase_gc();
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
+
+ auto r1 = make_remove_location_reply(*_sender.command(0));
+ r1->set_selection_matches({});
+ auto r2 = make_remove_location_reply(*_sender.command(1));
+ r2->set_selection_matches({_e1, _e2, _e3, _e4});
+
+ _sender.commands().clear();
+ op->receive(_sender, r1);
+ op->receive(_sender, r2);
+
+ EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op));
+}
+
+
+TEST_F(GarbageCollectionOperationTest, db_metrics_and_timestamp_are_updated_on_second_phase_completion) {
+ enable_two_phase_gc();
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
+
+ auto r1 = make_remove_location_reply(*_sender.command(0));
+ r1->set_selection_matches({_e1, _e2, _e3});
+ auto r2 = make_remove_location_reply(*_sender.command(1));
+ r2->set_selection_matches({_e2, _e3, _e4}); // e2, e3 in common with r1
+
+ _sender.commands().clear();
+ op->receive(_sender, r1);
+ op->receive(_sender, r2);
+ ASSERT_EQ(2u, _sender.commands().size()); // Phase 2 sent
+
+ r1 = make_remove_location_reply(*_sender.command(0));
+ r1->set_documents_removed(3);
+ r1->setBucketInfo(api::BucketInfo(0x1234, 90, 500));
+
+ r2 = make_remove_location_reply(*_sender.command(1));
+ r2->set_documents_removed(3);
+ r2->setBucketInfo(api::BucketInfo(0x4567, 90, 500));
+
+ op->receive(_sender, r1);
+ op->receive(_sender, r2);
+
+ EXPECT_TRUE(op->ok());
+ EXPECT_TRUE(op->is_done());
+ EXPECT_EQ(3u, gc_removed_documents_metric());
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(0x1234, 90, 500),
+ api::BucketInfo(0x4567, 90, 500)},
+ _gc_start_time_sec));
+}
+
+struct GarbageCollectionOperationPhase1FailureTest : GarbageCollectionOperationTest {
+ std::shared_ptr<GarbageCollectionOperation> _op;
+ std::shared_ptr<api::RemoveLocationReply> _r1;
+ std::shared_ptr<api::RemoveLocationReply> _r2;
+
+ void SetUp() override {
+ GarbageCollectionOperationTest::SetUp();
+
+ enable_two_phase_gc();
+ _op = create_op();
+ _op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
+
+ _r1 = make_remove_location_reply(*_sender.command(0));
+ _r1->set_selection_matches({_e1});
+ _r2 = make_remove_location_reply(*_sender.command(1));
+ _r2->set_selection_matches({_e1});
+ }
+
+ void receive_phase1_replies() {
+ _sender.commands().clear();
+ _op->receive(_sender, _r1);
+ _op->receive(_sender, _r2);
+ }
+
+ void receive_phase1_replies_and_assert_no_phase_2_started() {
+ receive_phase1_replies();
+ ASSERT_EQ(0u, _sender.commands().size());
+ EXPECT_TRUE(_op->is_done());
+ EXPECT_FALSE(_op->ok());
+ // GC not completed, so timestamp/bucket DB are _not_ updated
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), // test init values
+ api::BucketInfo(250, 50, 300)},
+ 0/*GC start timestamp*/));
+ EXPECT_EQ(0u, gc_removed_documents_metric()); // Nothing removed
+ }
+};
+
+TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_failure_during_first_phase) {
+ _r2->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "oh no"));
+ receive_phase1_replies_and_assert_no_phase_2_started();
+}
+
+TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_cluster_state_version_changed_between_phases) {
+ enable_cluster_state("version:11 distributor:1 storage:2"); // version 10 -> 11
+ receive_phase1_replies_and_assert_no_phase_2_started();
+}
+
+TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_pending_cluster_state_between_phases) {
+ simulate_set_pending_cluster_state("version:11 distributor:1 storage:2"); // Pending; not enabled yet
+ receive_phase1_replies_and_assert_no_phase_2_started();
+}
+
+TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_bucket_inconsistently_split_between_phases) {
+ // Add a logical child of _bucket_id to the bucket tree. This implies an inconsistent split, as we never
+ // want to have a tree with buckets in inner node positions, only in leaves.
+ addNodesToBucketDB(BucketId(17, 1), "0=250/50/300,1=250/50/300");
+ receive_phase1_replies_and_assert_no_phase_2_started();
+}
+
+TEST_F(GarbageCollectionOperationTest, document_level_write_locks_are_checked_and_held_if_acquired) {
+ enable_two_phase_gc();
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
+
+ auto r1 = make_remove_location_reply(*_sender.command(0));
+ r1->set_selection_matches({_e1, _e2, _e3});
+ auto r2 = make_remove_location_reply(*_sender.command(1));
+ r2->set_selection_matches({_e1, _e2, _e3});
+
+ // Grab a lock on e2 to simulate a concurrent write to the document.
+ auto e2_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e2.id);
+ ASSERT_TRUE(e2_lock.valid());
+
+ _sender.commands().clear();
+ op->receive(_sender, r1);
+ op->receive(_sender, r2);
+
+ // Locks on e1 and e3 are held while GC removes are sent
+ auto e1_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e1.id);
+ EXPECT_FALSE(e1_lock.valid());
+ auto e3_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e3.id);
+ EXPECT_FALSE(e3_lock.valid());
+
+ std::vector<spi::IdAndTimestamp> expected({_e1, _e3}); // e2 not included in remove set
+ for (int i : {0, 1}) {
+ auto cmd = as_remove_location_command(_sender.command(i));
+ EXPECT_EQ(cmd->explicit_remove_set(), expected);
+ }
+
+ // Locks are implicitly released when the underlying operation is destroyed
+ op.reset();
+ e1_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e1.id);
+ EXPECT_TRUE(e1_lock.valid());
+ e3_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e3.id);
+ EXPECT_TRUE(e3_lock.valid());
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
index 014827cb03c..19ec51f4ed4 100644
--- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
@@ -2511,6 +2511,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, node_feature_sets_are_aggregated_from_nodes_
for (auto* s : stripes) {
for (uint16_t i : {0, 1, 2}) {
EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).unordered_merge_chaining);
+ EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).two_phase_remove_location);
}
}
@@ -2518,10 +2519,11 @@ TEST_F(TopLevelBucketDBUpdaterTest, node_feature_sets_are_aggregated_from_nodes_
for (uint32_t i = 0; i < _sender.commands().size(); i++) {
ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i),
dummy_buckets_to_return, [i](auto& reply) noexcept {
- // Pretend nodes 1 and 2 are on a shiny version with unordered merge chaining supported.
+ // Pretend nodes 1 and 2 are on a shiny version with support for new features.
// Node 0 does not support the fanciness.
if (i > 0) {
reply.supported_node_features().unordered_merge_chaining = true;
+ reply.supported_node_features().two_phase_remove_location = true;
}
}));
}
@@ -2529,8 +2531,11 @@ TEST_F(TopLevelBucketDBUpdaterTest, node_feature_sets_are_aggregated_from_nodes_
// Node features should be propagated to all stripes
for (auto* s : stripes) {
EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).unordered_merge_chaining);
+ EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).two_phase_remove_location);
EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).unordered_merge_chaining);
+ EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).two_phase_remove_location);
EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).unordered_merge_chaining);
+ EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).two_phase_remove_location);
}
}
diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp
index a02167bb08a..04ab5ad0cf4 100644
--- a/storage/src/tests/persistence/processalltest.cpp
+++ b/storage/src/tests/persistence/processalltest.cpp
@@ -10,6 +10,7 @@
#include <vespa/document/fieldvalue/intfieldvalue.h>
using document::test::makeDocumentBucket;
+using document::DocumentId;
using namespace ::testing;
namespace storage {
@@ -32,7 +33,7 @@ TEST_F(ProcessAllHandlerTest, change_of_repos_is_reflected) {
EXPECT_EQ(newDocRepo.get(), &getEnv().getDocumentTypeRepo());
}
-TEST_F(ProcessAllHandlerTest, remove_location) {
+TEST_F(ProcessAllHandlerTest, legacy_remove_location) {
document::BucketId bucketId(16, 4);
doPut(4, spi::Timestamp(1234));
doPut(4, spi::Timestamp(2345));
@@ -54,7 +55,7 @@ TEST_F(ProcessAllHandlerTest, remove_location) {
EXPECT_EQ(2u, reply->documents_removed());
}
-TEST_F(ProcessAllHandlerTest, remove_location_document_subset) {
+TEST_F(ProcessAllHandlerTest, legacy_remove_location_document_subset) {
document::BucketId bucketId(16, 4);
AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier, *_sequenceTaskExecutor, _bucketIdFactory);
@@ -89,6 +90,95 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) {
EXPECT_EQ(5u, reply->documents_removed());
}
+TEST_F(ProcessAllHandlerTest, remove_location_with_enumerate_only_returns_match_set_only) {
+ document::BucketId bucketId(16, 4);
+ AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier,
+ *_sequenceTaskExecutor, _bucketIdFactory);
+
+ document::TestDocMan docMan;
+ for (int i = 0; i < 10; ++i) {
+ document::Document::SP doc(docMan.createRandomDocumentAtLocation(4, 1234 + i));
+ doc->setValue(doc->getField("headerval"), document::IntFieldValue(i));
+ doPut(doc, bucketId, spi::Timestamp(100 + i));
+ }
+
+ document::Bucket bucket = makeDocumentBucket(bucketId);
+ auto cmd = std::make_shared<api::RemoveLocationCommand>("testdoctype1.headerval % 2 == 0", bucket);
+ cmd->set_only_enumerate_docs(true);
+ auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket));
+ // Enumeration is synchronous, so we get the reply in the _tracker_, not on the reply queue.
+ ASSERT_TRUE(tracker->hasReply());
+ auto* reply = dynamic_cast<api::RemoveLocationReply*>(&tracker->getReply());
+ ASSERT_TRUE(reply);
+ EXPECT_EQ(0u, reply->documents_removed());
+
+ // No docs should be removed (remove flag is all zero)
+ EXPECT_EQ("DocEntry(100, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n"
+ "DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n"
+ "DocEntry(102, 0, Doc(id:mail:testdoctype1:n=4:62608.html))\n"
+ "DocEntry(103, 0, Doc(id:mail:testdoctype1:n=4:26566.html))\n"
+ "DocEntry(104, 0, Doc(id:mail:testdoctype1:n=4:56061.html))\n"
+ "DocEntry(105, 0, Doc(id:mail:testdoctype1:n=4:20019.html))\n"
+ "DocEntry(106, 0, Doc(id:mail:testdoctype1:n=4:49514.html))\n"
+ "DocEntry(107, 0, Doc(id:mail:testdoctype1:n=4:13472.html))\n"
+ "DocEntry(108, 0, Doc(id:mail:testdoctype1:n=4:42967.html))\n"
+ "DocEntry(109, 0, Doc(id:mail:testdoctype1:n=4:6925.html))\n",
+ dumpBucket(bucketId));
+
+ std::vector<spi::IdAndTimestamp> expected = {
+ {DocumentId("id:mail:testdoctype1:n=4:3619.html"), spi::Timestamp(100)},
+ {DocumentId("id:mail:testdoctype1:n=4:62608.html"), spi::Timestamp(102)},
+ {DocumentId("id:mail:testdoctype1:n=4:56061.html"), spi::Timestamp(104)},
+ {DocumentId("id:mail:testdoctype1:n=4:49514.html"), spi::Timestamp(106)},
+ {DocumentId("id:mail:testdoctype1:n=4:42967.html"), spi::Timestamp(108)},
+ };
+ EXPECT_EQ(reply->selection_matches(), expected);
+}
+
+TEST_F(ProcessAllHandlerTest, remove_location_with_remove_set_only_removes_listed_docs) {
+ document::BucketId bucketId(16, 4);
+ AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier,
+ *_sequenceTaskExecutor, _bucketIdFactory);
+
+ document::TestDocMan docMan;
+ for (int i = 0; i < 10; ++i) {
+ document::Document::SP doc(docMan.createRandomDocumentAtLocation(4, 1234 + i));
+ doc->setValue(doc->getField("headerval"), document::IntFieldValue(i));
+ doPut(doc, bucketId, spi::Timestamp(100 + i));
+ }
+
+ document::Bucket bucket = makeDocumentBucket(bucketId);
+ // Use a selection that, if naively used, removes everything.
+ auto cmd = std::make_shared<api::RemoveLocationCommand>("true", bucket);
+ std::vector<spi::IdAndTimestamp> to_remove = {
+ {DocumentId("id:mail:testdoctype1:n=4:62608.html"), spi::Timestamp(102)},
+ {DocumentId("id:mail:testdoctype1:n=4:49514.html"), spi::Timestamp(106)},
+ {DocumentId("id:mail:testdoctype1:n=4:42967.html"), spi::Timestamp(108)},
+ };
+ cmd->set_explicit_remove_set(std::move(to_remove));
+ auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket));
+ // Actually removing the documents is asynchronous, so the response will be on the queue.
+ std::shared_ptr<api::StorageMessage> msg;
+ ASSERT_TRUE(_replySender.queue.getNext(msg, 60s));
+
+ // Remove flag toggled for the entries provided in the command
+ EXPECT_EQ("DocEntry(100, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n"
+ "DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n"
+ "DocEntry(102, 1, id:mail:testdoctype1:n=4:62608.html)\n"
+ "DocEntry(103, 0, Doc(id:mail:testdoctype1:n=4:26566.html))\n"
+ "DocEntry(104, 0, Doc(id:mail:testdoctype1:n=4:56061.html))\n"
+ "DocEntry(105, 0, Doc(id:mail:testdoctype1:n=4:20019.html))\n"
+ "DocEntry(106, 1, id:mail:testdoctype1:n=4:49514.html)\n"
+ "DocEntry(107, 0, Doc(id:mail:testdoctype1:n=4:13472.html))\n"
+ "DocEntry(108, 1, id:mail:testdoctype1:n=4:42967.html)\n"
+ "DocEntry(109, 0, Doc(id:mail:testdoctype1:n=4:6925.html))\n",
+ dumpBucket(bucketId));
+
+ auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(msg);
+ ASSERT_TRUE(reply);
+ EXPECT_EQ(3u, reply->documents_removed());
+}
+
TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_unknown_doc_type) {
document::BucketId bucketId(16, 4);
doPut(4, spi::Timestamp(1234));
diff --git a/storage/src/tests/storageapi/mbusprot/mbusprot.4.2.serialization.V_4_2_STABLE b/storage/src/tests/storageapi/mbusprot/mbusprot.4.2.serialization.V_4_2_STABLE
deleted file mode 100644
index 5045a98b037..00000000000
--- a/storage/src/tests/storageapi/mbusprot/mbusprot.4.2.serialization.V_4_2_STABLE
+++ /dev/null
@@ -1,66 +0,0 @@
-
-MessageType(10, Put)
-\00\00\00
-\00\00\00j\00\08\00\00\00ddoc:test:test\00\05testdoctype1\00\00\01\00\00\00=\00\01\05\00= ;This is the contents of the test document.
-It ain't much.
-\00@\00\00\00\00\00\00Q\00\00\00\00\00\00\00\0e\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(11, Put Reply, reply of Put)
-\00\00\00\0b\00\00\00\0ddoc:test:test@\00\00\00\00\00\00Q\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(82, Update)
-\00\00\00R\00\00\005\00\08doc:test:test\00\01testdoctype1\00\00\01\00\00\00\01\00\00\00\02\00\00\00\01\00\00\10\1b\01\00\00\00\11@\00\00\00\00\00\00Q\00\00\00\00\00\00\00\0e\00\00\00\00\00\00\00
-\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(83, Update Reply, reply of Update)
-\00\00\00S\00\00\00\0ddoc:test:test@\00\00\00\00\00\00Q\00\00\00\00\00\00\00\0e\00\00\00\00\00\00\00\08\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(4, Get)
-\00\00\00\04\00\00\00\0ddoc:test:test@\00\00\00\00\00\00Q\00\00\00\00\00\00\00{\01\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(5, Get Reply, reply of Get)
-\00\00\00\05\00\00\00j\00\08\00\00\00ddoc:test:test\00\05testdoctype1\00\00\01\00\00\00=\00\01\05\00= ;This is the contents of the test document.
-It ain't much.
-\00\00\00\00\00\00\00\00d\00\00\00\0ddoc:test:test\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(12, Remove)
-\00\00\00\0c\00\00\00\0ddoc:test:test@\00\00\00\00\00\00Q\00\00\00\00\00\00\00\9f\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(13, Remove Reply, reply of Remove)
-\00\00\00\0d\00\00\00\0ddoc:test:test@\00\00\00\00\00\00Q\00\00\00\00\00\00\000\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(14, Revert)
-\00\00\00\0e@\00\00\00\00\00\00Q\00\00\00\01\00\00\00\00\00\00\00;\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(15, Revert Reply, reply of Revert)
-\00\00\00\0f@\00\00\00\00\00\00Q\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(54, Request bucket info)
-\00\00\006\00\00\00\00\00\03\00\00\00\14distributor:3 .1.s:d\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(55, Request bucket info reply, reply of Request bucket info)
-\00\00\007\00\00\00\01\00\00\00\00\00\00\00\04\00\00\00+\00\00\00\18\00\00\00{\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(56, Notify bucket change)
-\00\00\008P\00\00\00\00\00\03\e8\00\00\00\02\00\00\00\03\00\00\00\04\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(57, Notify bucket change reply, reply of Notify bucket change)
-\00\00\009\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(26, Create bucket)
-\00\00\00\1a\00\00\00\00\00\00\02o\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(27, Create bucket reply, reply of Create bucket)
-\00\00\00\1b\00\00\00\00\00\00\02o\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(34, Delete bucket)
-\00\00\00"\00\00\00\00\00\00\02o\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(35, Delete bucket reply, reply of Delete bucket)
-\00\00\00#\00\00\00\00\00\00\02o\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(32, Merge bucket)
-\00\00\00 \00\00\00\00\00\00\02o\00\03\00\04\00\00\0d\01\00\1a\01\00\00\00\00\00\00\04\d2\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(33, Merge bucket reply, reply of Merge bucket)
-\00\00\00!\00\00\00\00\00\00\02o\00\03\00\04\00\00\0d\01\00\1a\01\00\00\00\00\00\00\04\d2\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\04\00\00\00
-\00\00\00d
-MessageType(50, GetBucketDiff)
-\00\00\002\00\00\00\00\00\00\02o\00\02\00\04\00\00\0d\00\00\00\00\00\00\00\04 \00\00\00\01\00\00\00\00\00\01\e2@\00\0c1234567890ab\00\00\00d\00\01\00\00\00\01\00\03\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(51, GetBucketDiff reply, reply of GetBucketDiff)
-\00\00\003\00\00\00\00\00\00\02o\00\02\00\04\00\00\0d\00\00\00\00\00\00\00\04 \00\00\00\01\00\00\00\00\00\01\e2@\00\0c1234567890ab\00\00\00d\00\01\00\00\00\01\00\03\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(52, ApplyBucketDiff)
-\00\00\004@\00\00\00\00\00\02o\00\02\00\04\00\00\0d\00\00\00\04\d2\00\00\00\01\00\00\00\00\00\00\00\00\00\0c\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(53, ApplyBucketDiff reply, reply of ApplyBucketDiff)
-\00\00\005@\00\00\00\00\00\02o\00\02\00\04\00\00\0d\00\00\00\04\d2\00\00\00\01\00\00\00\00\00\00\00\00\00\0c\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(66, SplitBucket)
-\00\00\00B@\00\00\00\00\00\00\00\14(\00\00\03\e8\00\00\00\05\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(67, SplitBucket reply, reply of SplitBucket)
-\00\00\00C@\00\00\00\00\00\00\00\00\00\00\02D\00\00\00\00\00\00\00\00\00\00d\00\00\03\e8\00\00'\10D\00\00\00\00\00\00\01\00\00\00e\00\00\03\e9\00\00'\11\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01
-MessageType(18, Visitor Create)
-\00\00\00\12\00\00\00\07library\00\00\00\02id\00\00\00\0ddoc selection\00\00\00\01\00\00\00\0bcontroldest\00\00\00\08datadest\00\00\00\02\00\00\00\00\00\00\00{\00\00\00\00\00\00\01\c8\00\00\00\02@\00\00\00\00\00\00\01@\00\00\00\00\00\00\02\00\01\01\00\00\00d\00\00\00\03\00\00\00\0dinto darkness\00\00\00\09bind them\00\00\00\08one ring\00\00\00\10to rule them all\00\00\00\0bone ring to\00\00\00\0dfind them and\00\00\00\00\00\00\00\00\01\ff\ff
-MessageType(19, Visitor Create Reply, reply of Visitor Create)
-\00\00\00\13\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01
-It ain't much.
-\00\00P\00\00\f1\f1\f1\f1\f1\00\00\00\00\00\00\00\00\01\ff\ff
diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
index d4d22d6a36c..5ed2e0d96b4 100644
--- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
+++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
@@ -387,11 +387,8 @@ TEST_P(StorageProtocolTest, request_bucket_info) {
// separately until we can figure out if this is by design or not.
EXPECT_EQ(lastMod, entries[0]._info.getLastModified());
- if (GetParam().getMajor() >= 7) {
- EXPECT_TRUE(reply2->supported_node_features().unordered_merge_chaining);
- } else {
- EXPECT_FALSE(reply2->supported_node_features().unordered_merge_chaining);
- }
+ EXPECT_TRUE(reply2->supported_node_features().unordered_merge_chaining);
+ EXPECT_TRUE(reply2->supported_node_features().two_phase_remove_location);
}
}
@@ -530,16 +527,57 @@ TEST_P(StorageProtocolTest, destroy_visitor) {
auto reply2 = copyReply(reply);
}
-TEST_P(StorageProtocolTest, remove_location) {
+TEST_P(StorageProtocolTest, legacy_remove_location) {
+ auto cmd = std::make_shared<RemoveLocationCommand>("id.group == \"mygroup\"", _bucket);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ("id.group == \"mygroup\"", cmd2->getDocumentSelection());
+ EXPECT_EQ(_bucket, cmd2->getBucket());
+ EXPECT_TRUE(cmd2->explicit_remove_set().empty());
+ EXPECT_FALSE(cmd2->only_enumerate_docs());
+
+ uint32_t n_docs_removed = 12345;
+ auto reply = std::make_shared<RemoveLocationReply>(*cmd2, n_docs_removed);
+ auto reply2 = copyReply(reply);
+ EXPECT_EQ(n_docs_removed, reply2->documents_removed());
+ EXPECT_TRUE(reply2->selection_matches().empty());
+}
+
+TEST_P(StorageProtocolTest, phase_1_remove_location) {
+ auto cmd = std::make_shared<RemoveLocationCommand>("id.group == \"mygroup\"", _bucket);
+ cmd->set_only_enumerate_docs(true);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ("id.group == \"mygroup\"", cmd2->getDocumentSelection());
+ EXPECT_EQ(_bucket, cmd2->getBucket());
+ EXPECT_TRUE(cmd2->explicit_remove_set().empty());
+ EXPECT_TRUE(cmd2->only_enumerate_docs());
+
+ auto reply = std::make_shared<RemoveLocationReply>(*cmd2, 0);
+ std::vector<spi::IdAndTimestamp> docs;
+ docs.emplace_back(DocumentId("id:foo:bar::baz"), spi::Timestamp(12345));
+ docs.emplace_back(DocumentId("id:foo:bar::zoid"), spi::Timestamp(67890));
+ reply->set_selection_matches(docs);
+ auto reply2 = copyReply(reply);
+ EXPECT_EQ(0, reply2->documents_removed());
+ EXPECT_EQ(reply2->selection_matches(), docs);
+}
+
+TEST_P(StorageProtocolTest, phase_2_remove_location) {
auto cmd = std::make_shared<RemoveLocationCommand>("id.group == \"mygroup\"", _bucket);
+ std::vector<spi::IdAndTimestamp> docs;
+ docs.emplace_back(DocumentId("id:foo:bar::baz"), spi::Timestamp(12345));
+ docs.emplace_back(DocumentId("id:foo:bar::zoid"), spi::Timestamp(67890));
+ cmd->set_explicit_remove_set(docs);
auto cmd2 = copyCommand(cmd);
EXPECT_EQ("id.group == \"mygroup\"", cmd2->getDocumentSelection());
EXPECT_EQ(_bucket, cmd2->getBucket());
+ EXPECT_FALSE(cmd2->only_enumerate_docs());
+ EXPECT_EQ(cmd2->explicit_remove_set(), docs);
uint32_t n_docs_removed = 12345;
auto reply = std::make_shared<RemoveLocationReply>(*cmd2, n_docs_removed);
auto reply2 = copyReply(reply);
EXPECT_EQ(n_docs_removed, reply2->documents_removed());
+ EXPECT_TRUE(reply2->selection_matches().empty());
}
TEST_P(StorageProtocolTest, stat_bucket) {
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index b4c23725493..c83c5cdd245 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -52,6 +52,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_implicitly_clear_priority_on_schedule(false),
_use_unordered_merge_chaining(false),
_inhibit_default_merges_when_global_merges_pending(false),
+ _enable_two_phase_garbage_collection(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{
}
@@ -175,6 +176,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_implicitly_clear_priority_on_schedule = config.implicitlyClearBucketPriorityOnSchedule;
_use_unordered_merge_chaining = config.useUnorderedMergeChaining;
_inhibit_default_merges_when_global_merges_pending = config.inhibitDefaultMergesWhenGlobalMergesPending;
+ _enable_two_phase_garbage_collection = config.enableTwoPhaseGarbageCollection;
_minimumReplicaCountingMode = config.minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index b26f115827e..a18770ac69f 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -119,6 +119,8 @@ public:
const MaintenancePriorities& getMaintenancePriorities() const {
return _maintenancePriorities;
}
+
+ uint8_t default_external_feed_priority() const noexcept { return 120; }
/**
@see setSplitCount
@@ -279,6 +281,12 @@ public:
[[nodiscard]] bool inhibit_default_merges_when_global_merges_pending() const noexcept {
return _inhibit_default_merges_when_global_merges_pending;
}
+ void set_enable_two_phase_garbage_collection(bool enable) noexcept {
+ _enable_two_phase_garbage_collection = enable;
+ }
+ [[nodiscard]] bool enable_two_phase_garbage_collection() const noexcept {
+ return _enable_two_phase_garbage_collection;
+ }
uint32_t num_distributor_stripes() const noexcept { return _num_distributor_stripes; }
@@ -338,6 +346,7 @@ private:
bool _implicitly_clear_priority_on_schedule;
bool _use_unordered_merge_chaining;
bool _inhibit_default_merges_when_global_merges_pending;
+ bool _enable_two_phase_garbage_collection;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index 0a858ba37c3..e363f53a4ea 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -299,3 +299,11 @@ use_unordered_merge_chaining bool default=true
## one or more nodes is in maintenance mode in the default bucket space but marked up in
## the global bucket space.
inhibit_default_merges_when_global_merges_pending bool default=true
+
+## If true, garbage collection is performed in two phases (metadata gathering and deletion)
+## instead of just a single phase. Two-phase GC allows for ensuring the same set of documents
+## is deleted across all nodes and explicitly takes write locks on the distributor to prevent
+## concurrent feed ops to GC'd documents from potentially creating inconsistencies.
+## Two-phase GC is only used iff all replica content nodes support the feature AND it's enabled
+## by this config.
+enable_two_phase_garbage_collection bool default=false
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index 49b062d6cc1..d1366bc0285 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -93,6 +93,10 @@ public:
return *_operation_sequencer;
}
+ OperationSequencer& operation_sequencer() noexcept override {
+ return *_operation_sequencer;
+ }
+
const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const override;
/**
diff --git a/storage/src/vespa/storage/distributor/distributormessagesender.h b/storage/src/vespa/storage/distributor/distributormessagesender.h
index 198427078d6..ef0252661f3 100644
--- a/storage/src/vespa/storage/distributor/distributormessagesender.h
+++ b/storage/src/vespa/storage/distributor/distributormessagesender.h
@@ -29,6 +29,7 @@ public:
virtual PendingMessageTracker& getPendingMessageTracker() = 0;
virtual const PendingMessageTracker& getPendingMessageTracker() const = 0;
virtual const OperationSequencer& operation_sequencer() const noexcept = 0;
+ virtual OperationSequencer& operation_sequencer() noexcept = 0;
};
}
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index 9ce8d871fc3..0ada9da29c1 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -63,6 +63,9 @@ public:
const OperationSequencer& operation_sequencer() const noexcept override {
abort(); // Never called by the messages using this component.
}
+ OperationSequencer& operation_sequencer() noexcept override {
+ abort(); // Never called by the messages using this component.
+ }
};
ExternalOperationHandler::ExternalOperationHandler(DistributorNodeContext& node_ctx,
diff --git a/storage/src/vespa/storage/distributor/node_supported_features.h b/storage/src/vespa/storage/distributor/node_supported_features.h
index 647e063f93e..87b2a9aef8e 100644
--- a/storage/src/vespa/storage/distributor/node_supported_features.h
+++ b/storage/src/vespa/storage/distributor/node_supported_features.h
@@ -11,11 +11,10 @@ namespace storage::distributor {
* are initially expected to be unsupported.
*/
struct NodeSupportedFeatures {
- bool unordered_merge_chaining = false;
+ bool unordered_merge_chaining = false;
+ bool two_phase_remove_location = false;
- bool operator==(const NodeSupportedFeatures& rhs) const noexcept {
- return unordered_merge_chaining == rhs.unordered_merge_chaining;
- };
+ bool operator==(const NodeSupportedFeatures& rhs) const noexcept = default;
};
}
diff --git a/storage/src/vespa/storage/distributor/operationowner.h b/storage/src/vespa/storage/distributor/operationowner.h
index d4afdf2cce7..c43d9f05fc9 100644
--- a/storage/src/vespa/storage/distributor/operationowner.h
+++ b/storage/src/vespa/storage/distributor/operationowner.h
@@ -55,6 +55,10 @@ public:
return _sender.operation_sequencer();
}
+ OperationSequencer& operation_sequencer() noexcept override {
+ return _sender.operation_sequencer();
+ }
+
private:
OperationOwner& _owner;
DistributorStripeMessageSender& _sender;
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index e91df1da51e..74ff8371fbe 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -36,6 +36,8 @@ PutOperation::PutOperation(const DistributorNodeContext& node_ctx,
{
}
+PutOperation::~PutOperation() = default;
+
void
PutOperation::insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetList& copies, bool setOneActive,
const api::StorageCommand& originalCommand,
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
index 61e4678496f..7ec81d6570d 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
@@ -29,6 +29,7 @@ public:
std::shared_ptr<api::PutCommand> msg,
PersistenceOperationMetricSet& metric,
SequencingHandle sequencingHandle = SequencingHandle());
+ ~PutOperation() override;
void onStart(DistributorStripeMessageSender& sender) override;
const char* getName() const override { return "put"; };
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index af3b727e07b..7c04636dc50 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -97,6 +97,10 @@ struct IntermediateMessageSender : DistributorStripeMessageSender {
const OperationSequencer& operation_sequencer() const noexcept override {
return forward.operation_sequencer();
}
+
+ OperationSequencer& operation_sequencer() noexcept override {
+ return forward.operation_sequencer();
+ }
};
IntermediateMessageSender::IntermediateMessageSender(SentMessageMap& mm,
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
index 7f66d1effd5..a30be1293cd 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
@@ -5,44 +5,110 @@
#include <vespa/storage/distributor/idealstatemetricsset.h>
#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
+#include <vespa/storage/distributor/node_supported_features_repo.h>
#include <vespa/storageapi/message/removelocation.h>
+#include <vespa/vespalib/stllike/hash_set.hpp>
+#include <algorithm>
#include <vespa/log/log.h>
-LOG_SETUP(".distributor.operation.idealstate.remove");
+LOG_SETUP(".distributor.operation.idealstate.gc");
namespace storage::distributor {
GarbageCollectionOperation::GarbageCollectionOperation(const ClusterContext& cluster_ctx, const BucketAndNodes& nodes)
: IdealStateOperation(nodes),
_tracker(cluster_ctx),
+ _phase(Phase::NotStarted),
+ _cluster_state_version_at_phase1_start_time(0),
+ _phase1_replies_received(0),
+ _remove_candidate_set(),
_replica_info(),
- _max_documents_removed(0)
+ _max_documents_removed(0),
+ _is_done(false)
{}
GarbageCollectionOperation::~GarbageCollectionOperation() = default;
-void GarbageCollectionOperation::onStart(DistributorStripeMessageSender& sender) {
+const char* GarbageCollectionOperation::to_string(Phase phase) noexcept {
+ switch (phase) {
+ case Phase::NotStarted: return "NotStarted";
+ case Phase::LegacySinglePhase: return "LegacySinglePhase";
+ case Phase::ReadMetadataPhase: return "ReadMetadataPhase";
+ case Phase::WriteRemovesPhase: return "WriteRemovesPhase";
+ default: abort();
+ }
+}
+
+bool GarbageCollectionOperation::all_involved_nodes_support_two_phase_gc() const noexcept {
+ const auto& features_repo = _manager->operation_context().node_supported_features_repo();
+ for (uint16_t node : getNodes()) {
+ if (!features_repo.node_supported_features(node).two_phase_remove_location) {
+ return false;
+ }
+ }
+ return true;
+}
+
+std::vector<spi::IdAndTimestamp> GarbageCollectionOperation::compile_phase_two_send_set() const {
+ std::vector<spi::IdAndTimestamp> docs_to_remove(_remove_candidate_set.begin(), _remove_candidate_set.end());
+ // Use timestamp order to provide test determinism and allow for backend linear merging (if needed).
+ // Tie-break on GID upon collisions (which technically should never happen...!)
+ auto ts_then_gid_order = [](const spi::IdAndTimestamp& lhs, const spi::IdAndTimestamp& rhs) noexcept {
+ if (lhs.timestamp != rhs.timestamp) {
+ return (lhs.timestamp < rhs.timestamp);
+ }
+ return (lhs.id.getGlobalId() < rhs.id.getGlobalId());
+ };
+ std::sort(docs_to_remove.begin(), docs_to_remove.end(), ts_then_gid_order);
+ return docs_to_remove;
+}
+
+void GarbageCollectionOperation::send_current_phase_remove_locations(DistributorStripeMessageSender& sender) {
BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId());
std::vector<uint16_t> nodes = entry->getNodes();
+ auto docs_to_remove = compile_phase_two_send_set(); // Always empty unless in phase 2 of two-phase GC
- for (auto node : nodes) {
+ for (size_t i = 0; i < nodes.size(); ++i) {
auto command = std::make_shared<api::RemoveLocationCommand>(
_manager->operation_context().distributor_config().getGarbageCollectionSelection(),
getBucket());
-
- command->setPriority(_priority);
- _tracker.queueCommand(command, node);
+ if (_phase == Phase::ReadMetadataPhase) {
+ command->set_only_enumerate_docs(true);
+ } else if (_phase == Phase::WriteRemovesPhase) {
+ if (i < nodes.size() - 1) {
+ command->set_explicit_remove_set(docs_to_remove);
+ } else {
+ command->set_explicit_remove_set(std::move(docs_to_remove));
+ }
+ } // else: legacy command
+ command->setPriority((_phase != Phase::WriteRemovesPhase)
+ ? _priority
+ : _manager->operation_context().distributor_config().default_external_feed_priority());
+ _tracker.queueCommand(command, nodes[i]);
}
-
_tracker.flushQueue(sender);
+}
+void GarbageCollectionOperation::onStart(DistributorStripeMessageSender& sender) {
+ if (_manager->operation_context().distributor_config().enable_two_phase_garbage_collection() &&
+ all_involved_nodes_support_two_phase_gc())
+ {
+ _cluster_state_version_at_phase1_start_time = _bucketSpace->getClusterState().getVersion();
+ LOG(debug, "Starting first phase of two-phase GC for %s at cluster state version %u",
+ getBucket().toString().c_str(), _cluster_state_version_at_phase1_start_time);
+ transition_to(Phase::ReadMetadataPhase);
+ } else {
+ LOG(debug, "Starting legacy single-phase GC for %s", getBucket().toString().c_str());
+ transition_to(Phase::LegacySinglePhase);
+ }
+ send_current_phase_remove_locations(sender);
if (_tracker.finished()) {
done();
}
}
void
-GarbageCollectionOperation::onReceive(DistributorStripeMessageSender&,
+GarbageCollectionOperation::onReceive(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply>& reply)
{
auto* rep = dynamic_cast<api::RemoveLocationReply*>(reply.get());
@@ -51,33 +117,154 @@ GarbageCollectionOperation::onReceive(DistributorStripeMessageSender&,
uint16_t node = _tracker.handleReply(*rep);
if (!rep->getResult().failed()) {
- _replica_info.emplace_back(_manager->operation_context().generate_unique_timestamp(),
- node, rep->getBucketInfo());
- _max_documents_removed = std::max(_max_documents_removed, rep->documents_removed());
+ if (_phase == Phase::LegacySinglePhase) {
+ handle_ok_legacy_reply(node, *rep);
+ } else if (_phase == Phase::ReadMetadataPhase) {
+ handle_ok_phase1_reply(*rep);
+ } else {
+ assert(_phase == Phase::WriteRemovesPhase);
+ handle_ok_phase2_reply(node, *rep);
+ }
} else {
_ok = false;
}
if (_tracker.finished()) {
+ const bool op_complete = (!_ok || _phase == Phase::LegacySinglePhase || _phase == Phase::WriteRemovesPhase);
if (_ok) {
- merge_received_bucket_info_into_db();
+ if (op_complete) {
+ merge_received_bucket_info_into_db();
+ } else {
+ assert(_phase == Phase::ReadMetadataPhase);
+ on_metadata_read_phase_done(sender);
+ }
+ }
+ if (op_complete) {
+ mark_operation_complete();
}
- update_gc_metrics();
- done();
}
}
-void GarbageCollectionOperation::merge_received_bucket_info_into_db() {
- // TODO avoid two separate DB ops for this. Current API currently does not make this elegant.
- _manager->operation_context().update_bucket_database(getBucket(), _replica_info);
+void GarbageCollectionOperation::update_replica_response_info_from_reply(uint16_t from_node, const api::RemoveLocationReply& reply) {
+ _replica_info.emplace_back(_manager->operation_context().generate_unique_timestamp(),
+ from_node, reply.getBucketInfo());
+ _max_documents_removed = std::max(_max_documents_removed, reply.documents_removed());
+}
+
+void GarbageCollectionOperation::handle_ok_legacy_reply(uint16_t from_node, const api::RemoveLocationReply& reply) {
+ update_replica_response_info_from_reply(from_node, reply);
+}
+
+GarbageCollectionOperation::RemoveCandidateSet
+GarbageCollectionOperation::steal_selection_matches_as_set(api::RemoveLocationReply& reply) {
+ auto candidates = reply.steal_selection_matches();
+ RemoveCandidateSet set;
+ set.resize(candidates.size());
+ for (auto& cand : candidates) {
+ set.insert(std::move(cand));
+ }
+ return set;
+}
+
+void GarbageCollectionOperation::handle_ok_phase1_reply(api::RemoveLocationReply& reply) {
+ assert(reply.documents_removed() == 0);
+ if (_phase1_replies_received == 0) {
+ // Establish baseline candidate set. Since we require an intersection between all
+ // sets, the number of candidates can never be _greater_ than that of the first reply.
+ _remove_candidate_set = steal_selection_matches_as_set(reply);
+ } else if (!_remove_candidate_set.empty()) {
+ auto their_set = steal_selection_matches_as_set(reply);
+ std::vector<spi::IdAndTimestamp> to_remove;
+ for (auto& our_cand : _remove_candidate_set) {
+ if (!their_set.contains(our_cand)) {
+ to_remove.emplace_back(our_cand);
+ }
+ }
+ for (auto& rm_entry : to_remove) {
+ _remove_candidate_set.erase(rm_entry);
+ }
+ }
+ ++_phase1_replies_received;
+}
+
+void GarbageCollectionOperation::handle_ok_phase2_reply(uint16_t from_node, const api::RemoveLocationReply& reply) {
+ update_replica_response_info_from_reply(from_node, reply);
+}
+
+bool GarbageCollectionOperation::may_start_write_phase() const {
+ if (!_ok) {
+ return false; // Already broken, no reason to proceed.
+ }
+ const auto state_version_now = _bucketSpace->getClusterState().getVersion();
+ if ((state_version_now != _cluster_state_version_at_phase1_start_time) ||
+ _bucketSpace->has_pending_cluster_state())
+ {
+ LOG(debug, "GC(%s): not sending write phase; cluster state has changed, or a change is pending",
+ getBucket().toString().c_str());
+ return false;
+ }
+ // If bucket is gone, or has become inconsistently split, abort mission.
+ std::vector<BucketDatabase::Entry> entries;
+ _bucketSpace->getBucketDatabase().getAll(getBucketId(), entries);
+ if ((entries.size() != 1) || (entries[0].getBucketId() != getBucketId())) {
+ LOG(debug, "GC(%s): not sending write phase; bucket has become inconsistent",
+ getBucket().toString().c_str());
+ return false;
+ }
+ return true;
+}
+
+void GarbageCollectionOperation::on_metadata_read_phase_done(DistributorStripeMessageSender& sender) {
+ if (!may_start_write_phase()) {
+ _ok = false;
+ mark_operation_complete();
+ return;
+ }
+ std::vector<spi::IdAndTimestamp> already_pending_write;
+ for (auto& cand : _remove_candidate_set) {
+ auto maybe_seq_token = sender.operation_sequencer().try_acquire(getBucket().getBucketSpace(), cand.id);
+ if (maybe_seq_token.valid()) {
+ _gc_write_locks.emplace_back(std::move(maybe_seq_token));
+ LOG(spam, "GC(%s): acquired write lock for '%s'; adding to GC set",
+ getBucket().toString().c_str(), cand.id.toString().c_str());
+ } else {
+ already_pending_write.emplace_back(cand);
+ LOG(spam, "GC(%s): failed to acquire write lock for '%s'; not including in GC set",
+ getBucket().toString().c_str(), cand.id.toString().c_str());
+ }
+ }
+ for (auto& rm_entry : already_pending_write) {
+ _remove_candidate_set.erase(rm_entry);
+ }
+ if (_remove_candidate_set.empty()) {
+ update_last_gc_timestamp_in_db(); // Nothing to remove now, try again later.
+ mark_operation_complete();
+ return;
+ }
+ LOG(debug, "GC(%s): Sending phase 2 GC with %zu entries (with acquired write locks). "
+ "%zu documents had pending writes and could not be GCd at this time",
+ getBucket().toString().c_str(), _remove_candidate_set.size(), already_pending_write.size());
+ transition_to(Phase::WriteRemovesPhase);
+ send_current_phase_remove_locations(sender);
+}
+
+void GarbageCollectionOperation::update_last_gc_timestamp_in_db() {
BucketDatabase::Entry dbentry = _bucketSpace->getBucketDatabase().get(getBucketId());
if (dbentry.valid()) {
dbentry->setLastGarbageCollectionTime(
_manager->node_context().clock().getTimeInSeconds().getTime());
+ LOG(debug, "GC(%s): Tagging bucket completed at time %u",
+ getBucket().toString().c_str(), dbentry->getLastGarbageCollectionTime());
_bucketSpace->getBucketDatabase().update(dbentry);
}
}
+void GarbageCollectionOperation::merge_received_bucket_info_into_db() {
+ // TODO avoid two separate DB ops for this. Current API currently does not make this elegant.
+ _manager->operation_context().update_bucket_database(getBucket(), _replica_info);
+ update_last_gc_timestamp_in_db();
+}
+
void GarbageCollectionOperation::update_gc_metrics() {
auto metric_base = _manager->getMetrics().operations[IdealStateOperation::GARBAGE_COLLECTION];
auto gc_metrics = std::dynamic_pointer_cast<GcMetricSet>(metric_base);
@@ -85,6 +272,21 @@ void GarbageCollectionOperation::update_gc_metrics() {
gc_metrics->documents_removed.inc(_max_documents_removed);
}
+void GarbageCollectionOperation::mark_operation_complete() {
+ assert(!_is_done);
+ if (_ok) {
+ update_gc_metrics();
+ }
+ done();
+ _is_done = true;
+}
+
+void GarbageCollectionOperation::transition_to(Phase new_phase) {
+ LOG(spam, "GC(%s): state transition %s -> %s",
+ getBucket().toString().c_str(), to_string(_phase), to_string(new_phase));
+ _phase = new_phase;
+}
+
bool
GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const {
return true;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
index f51739242b7..adbbd210877 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
@@ -4,14 +4,16 @@
#include "idealstateoperation.h"
#include <vespa/storage/bucketdb/bucketcopy.h>
#include <vespa/storage/distributor/messagetracker.h>
+#include <vespa/storage/distributor/operation_sequencer.h>
+#include <vespa/persistence/spi/id_and_timestamp.h>
+#include <vespa/vespalib/stllike/hash_set.h>
#include <vector>
namespace storage::distributor {
class PendingMessageTracker;
-class GarbageCollectionOperation : public IdealStateOperation
-{
+class GarbageCollectionOperation final : public IdealStateOperation {
public:
GarbageCollectionOperation(const ClusterContext& cluster_ctx,
const BucketAndNodes& nodes);
@@ -22,15 +24,51 @@ public:
const char* getName() const override { return "garbagecollection"; };
Type getType() const override { return GARBAGE_COLLECTION; }
bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override;
+ bool is_two_phase() const noexcept {
+ return ((_phase == Phase::ReadMetadataPhase) || (_phase == Phase::WriteRemovesPhase));
+ }
+ bool is_done() const noexcept { return _is_done; }
protected:
MessageTracker _tracker;
private:
- std::vector<BucketCopy> _replica_info;
- uint32_t _max_documents_removed;
+ enum class Phase {
+ NotStarted,
+ LegacySinglePhase,
+ ReadMetadataPhase,
+ WriteRemovesPhase
+ };
+ static const char* to_string(Phase phase) noexcept;
+
+ using RemoveCandidateSet = vespalib::hash_set<spi::IdAndTimestamp, spi::IdAndTimestamp::hash>;
+
+ Phase _phase;
+ uint32_t _cluster_state_version_at_phase1_start_time;
+ uint32_t _phase1_replies_received;
+ RemoveCandidateSet _remove_candidate_set;
+ std::vector<SequencingHandle> _gc_write_locks;
+ std::vector<BucketCopy> _replica_info;
+ uint32_t _max_documents_removed;
+ bool _is_done;
+
+ static RemoveCandidateSet steal_selection_matches_as_set(api::RemoveLocationReply& reply);
+
+ void send_current_phase_remove_locations(DistributorStripeMessageSender& sender);
+ std::vector<spi::IdAndTimestamp> compile_phase_two_send_set() const;
+
+ void handle_ok_legacy_reply(uint16_t from_node, const api::RemoveLocationReply& reply);
+ void handle_ok_phase1_reply(api::RemoveLocationReply& reply);
+ void handle_ok_phase2_reply(uint16_t from_node, const api::RemoveLocationReply& reply);
+ void update_replica_response_info_from_reply(uint16_t from_node, const api::RemoveLocationReply& reply);
+ void on_metadata_read_phase_done(DistributorStripeMessageSender& sender);
+ [[nodiscard]] bool may_start_write_phase() const;
+ [[nodiscard]] bool all_involved_nodes_support_two_phase_gc() const noexcept;
+ void update_last_gc_timestamp_in_db();
void merge_received_bucket_info_into_db();
void update_gc_metrics();
+ void mark_operation_complete();
+ void transition_to(Phase new_phase);
};
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
index f8f35afe821..7e62506b77f 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
@@ -179,8 +179,9 @@ public:
/**
Set the priority we should send messages from this operation with.
*/
- void setPriority(api::StorageMessage::Priority priority)
- { _priority = priority; }
+ void setPriority(api::StorageMessage::Priority priority) noexcept {
+ _priority = priority;
+ }
/**
* Returns true if we are blocked to start this operation given
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index 30071d4d962..dadbda60021 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -340,7 +340,8 @@ PendingClusterState::update_node_supported_features_from_reply(uint16_t node, co
{
const auto& src_feat = reply.supported_node_features();
NodeSupportedFeatures dest_feat;
- dest_feat.unordered_merge_chaining = src_feat.unordered_merge_chaining;
+ dest_feat.unordered_merge_chaining = src_feat.unordered_merge_chaining;
+ dest_feat.two_phase_remove_location = src_feat.two_phase_remove_location;
// This will overwrite per bucket-space reply, but does not matter since it's independent of bucket space.
_node_features.insert(std::make_pair(node, dest_feat));
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index d5bf733a30c..cc1bd557298 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -406,19 +406,34 @@ AsyncHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrack
{
tracker->setMetric(_env._metrics.removeLocation);
- LOG(debug, "RemoveLocation(%s): using selection '%s'",
- cmd.getBucketId().toString().c_str(),
- cmd.getDocumentSelection().c_str());
-
spi::Bucket bucket(cmd.getBucket());
- UnrevertableRemoveEntryProcessor::DocumentIdsAndTimeStamps to_remove;
- UnrevertableRemoveEntryProcessor processor(to_remove);
-
- {
- auto usage = vespalib::CpuUsage::use(CpuUsage::Category::READ);
- BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(),
- std::make_shared<document::DocIdOnly>(),
- processor, spi::NEWEST_DOCUMENT_ONLY, tracker->context());
+ const bool is_legacy = (!cmd.only_enumerate_docs() && cmd.explicit_remove_set().empty());
+ std::vector<spi::IdAndTimestamp> to_remove;
+
+ LOG(debug, "RemoveLocation(%s): using selection '%s' (enumerate only: %s, remove set size: %zu)",
+ bucket.toString().c_str(),
+ cmd.getDocumentSelection().c_str(),
+ (cmd.only_enumerate_docs() ? "yes" : "no"),
+ cmd.explicit_remove_set().size());
+
+ if (is_legacy || cmd.only_enumerate_docs()) {
+ UnrevertableRemoveEntryProcessor processor(to_remove);
+ {
+ auto usage = vespalib::CpuUsage::use(CpuUsage::Category::READ);
+ BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(),
+ std::make_shared<document::DocIdOnly>(),
+ processor, spi::NEWEST_DOCUMENT_ONLY, tracker->context());
+ }
+ if (!is_legacy) {
+ LOG(debug, "RemoveLocation(%s): returning 1st phase results with %zu entries",
+ bucket.toString().c_str(), to_remove.size());
+ auto reply = std::make_shared<api::RemoveLocationReply>(cmd, 0); // No docs removed yet
+ reply->set_selection_matches(std::move(to_remove));
+ tracker->setReply(std::move(reply));
+ return tracker;
+ }
+ } else {
+ to_remove = cmd.steal_explicit_remove_set();
}
auto task = makeResultTask([&cmd, tracker = std::move(tracker), removed = to_remove.size()](spi::Result::UP response) {
@@ -427,6 +442,8 @@ AsyncHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrack
tracker->sendReply();
});
+ // In the case where a _newer_ mutation exists for a given entry in to_remove, it will be ignored
+ // (with no tombstone added) since we only preserve the newest operation for a document.
_spi.removeAsync(bucket, std::move(to_remove),
std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto
index cb923db3c3c..92c0fdc0b87 100644
--- a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto
+++ b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto
@@ -88,9 +88,26 @@ message RevertResponse {
BucketId remapped_bucket_id = 2;
}
+message IdAndTimestamp {
+ DocumentId id = 1;
+ uint64 timestamp = 2;
+}
+
+message PhaseOneRemove {
+ // Currently empty; its presence is enough
+}
+
+message PhaseTwoRemove {
+ repeated IdAndTimestamp explicit_remove_set = 1;
+}
+
message RemoveLocationRequest {
- Bucket bucket = 1;
- bytes document_selection = 2;
+ Bucket bucket = 1;
+ bytes document_selection = 2;
+ oneof phased_remove {
+ PhaseOneRemove phase_one = 3;
+ PhaseTwoRemove phase_two = 4;
+ }
}
message RemoveLocationStats {
@@ -98,7 +115,8 @@ message RemoveLocationStats {
}
message RemoveLocationResponse {
- BucketInfo bucket_info = 1;
- BucketId remapped_bucket_id = 2;
- RemoveLocationStats stats = 3;
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+ RemoveLocationStats stats = 3;
+ repeated IdAndTimestamp selection_matches = 4; // Iff reply to phase 1 remove
}
diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
index 7f7ab1d7c0b..a79ac9fd99a 100644
--- a/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
+++ b/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
@@ -110,7 +110,8 @@ message BucketAndBucketInfo {
}
message SupportedNodeFeatures {
- bool unordered_merge_chaining = 1;
+ bool unordered_merge_chaining = 1;
+ bool two_phase_remove_location = 2;
}
message RequestBucketInfoResponse {
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index dee766b4b2d..662408c4e95 100644
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -646,30 +646,83 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRevertReply(const SCmd& cm
// RemoveLocation
// -----------------------------------------------------------------
+namespace {
+
+void set_document_id(protobuf::DocumentId& dest, const document::DocumentId& src) {
+ *dest.mutable_id() = src.toString();
+}
+
+document::DocumentId get_document_id(const protobuf::DocumentId& src) {
+ return document::DocumentId(src.id()); // id() shall always be null terminated
+}
+
+void set_id_and_timestamp_vector(::google::protobuf::RepeatedPtrField<protobuf::IdAndTimestamp>& dest,
+ const std::vector<spi::IdAndTimestamp>& src)
+{
+ dest.Reserve(src.size());
+ for (const auto& src_entry : src) {
+ auto* dest_entry = dest.Add();
+ dest_entry->set_timestamp(src_entry.timestamp);
+ set_document_id(*dest_entry->mutable_id(), src_entry.id);
+ }
+}
+
+std::vector<spi::IdAndTimestamp>
+get_id_and_timestamp_vector(const ::google::protobuf::RepeatedPtrField<protobuf::IdAndTimestamp>& src)
+{
+ std::vector<spi::IdAndTimestamp> vec;
+ vec.reserve(src.size());
+ for (const auto& src_entry : src) {
+ vec.emplace_back(get_document_id(src_entry.id()), spi::Timestamp(src_entry.timestamp()));
+ }
+ return vec;
+}
+
+}
+
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationCommand& msg) const {
encode_bucket_request<protobuf::RemoveLocationRequest>(buf, msg, [&](auto& req) {
req.set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size());
+ if (msg.only_enumerate_docs()) {
+ req.mutable_phase_one(); // Instantiating it is enough
+ } else if (!msg.explicit_remove_set().empty()) {
+ set_id_and_timestamp_vector(*req.mutable_phase_two()->mutable_explicit_remove_set(),
+ msg.explicit_remove_set());
+ }
});
}
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationReply& msg) const {
encode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, msg, [&](auto& res) {
res.mutable_stats()->set_documents_removed(msg.documents_removed());
+ if (!msg.selection_matches().empty()) {
+ set_id_and_timestamp_vector(*res.mutable_selection_matches(), msg.selection_matches());
+ }
});
}
api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveLocationCommand(BBuf& buf) const {
return decode_bucket_request<protobuf::RemoveLocationRequest>(buf, [&](auto& req, auto& bucket) {
- return std::make_unique<api::RemoveLocationCommand>(req.document_selection(), bucket);
+ auto cmd = std::make_unique<api::RemoveLocationCommand>(req.document_selection(), bucket);
+ if (req.has_phase_one()) {
+ cmd->set_only_enumerate_docs(true);
+ } else if (req.has_phase_two()) {
+ cmd->set_explicit_remove_set(get_id_and_timestamp_vector(req.phase_two().explicit_remove_set()));
+ }
+ return cmd;
});
}
api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf) const {
return decode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, [&](auto& res) {
uint32_t documents_removed = (res.has_stats() ? res.stats().documents_removed() : 0u);
- return std::make_unique<api::RemoveLocationReply>(
+ auto reply = std::make_unique<api::RemoveLocationReply>(
static_cast<const api::RemoveLocationCommand&>(cmd),
documents_removed);
+ if (!res.selection_matches().empty()) {
+ reply->set_selection_matches(get_id_and_timestamp_vector(res.selection_matches()));
+ }
+ return reply;
});
}
@@ -1004,6 +1057,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoRe
// We mark features as available at protocol level. Only included for full bucket fetch responses.
if (msg.full_bucket_fetch()) {
res.mutable_supported_node_features()->set_unordered_merge_chaining(true);
+ res.mutable_supported_node_features()->set_two_phase_remove_location(true);
}
});
}
@@ -1044,7 +1098,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(con
if (res.has_supported_node_features()) {
const auto& src_features = res.supported_node_features();
auto& dest_features = reply->supported_node_features();
- dest_features.unordered_merge_chaining = src_features.unordered_merge_chaining();
+ dest_features.unordered_merge_chaining = src_features.unordered_merge_chaining();
+ dest_features.two_phase_remove_location = src_features.two_phase_remove_location();
}
return reply;
});
diff --git a/storage/src/vespa/storageapi/message/bucket.h b/storage/src/vespa/storageapi/message/bucket.h
index 47785a92039..d9843236d2e 100644
--- a/storage/src/vespa/storageapi/message/bucket.h
+++ b/storage/src/vespa/storageapi/message/bucket.h
@@ -393,7 +393,8 @@ public:
friend std::ostream& operator<<(std::ostream& os, const Entry&);
};
struct SupportedNodeFeatures {
- bool unordered_merge_chaining = false;
+ bool unordered_merge_chaining = false;
+ bool two_phase_remove_location = false;
};
using EntryVector = vespalib::Array<Entry>;
private:
diff --git a/storage/src/vespa/storageapi/message/removelocation.cpp b/storage/src/vespa/storageapi/message/removelocation.cpp
index 7b7ed894b2c..5d558c5e305 100644
--- a/storage/src/vespa/storageapi/message/removelocation.cpp
+++ b/storage/src/vespa/storageapi/message/removelocation.cpp
@@ -11,15 +11,17 @@ IMPLEMENT_REPLY(RemoveLocationReply)
RemoveLocationCommand::RemoveLocationCommand(vespalib::stringref documentSelection,
const document::Bucket &bucket)
: BucketInfoCommand(MessageType::REMOVELOCATION, bucket),
- _documentSelection(documentSelection)
+ _documentSelection(documentSelection),
+ _explicit_remove_set(),
+ _only_enumerate_docs(false)
{}
-RemoveLocationCommand::~RemoveLocationCommand() {}
+RemoveLocationCommand::~RemoveLocationCommand() = default;
void
RemoveLocationCommand::print(std::ostream& out, bool verbose, const std::string& indent) const
{
- if (_documentSelection.length()) {
+ if (!_documentSelection.empty()) {
out << "Remove selection(" << _documentSelection << "): ";
}
BucketInfoCommand::print(out, verbose, indent);
diff --git a/storage/src/vespa/storageapi/message/removelocation.h b/storage/src/vespa/storageapi/message/removelocation.h
index 276b090cc57..4e7c1d26711 100644
--- a/storage/src/vespa/storageapi/message/removelocation.h
+++ b/storage/src/vespa/storageapi/message/removelocation.h
@@ -2,6 +2,7 @@
#pragma once
#include <vespa/storageapi/defs.h>
+#include <vespa/persistence/spi/id_and_timestamp.h>
#include <vespa/storageapi/messageapi/storagecommand.h>
#include <vespa/storageapi/messageapi/bucketinforeply.h>
@@ -15,13 +16,34 @@ public:
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
const vespalib::string& getDocumentSelection() const { return _documentSelection; }
+ // TODO move to factory pattern instead to disallow creating illegal combinations
+ void set_only_enumerate_docs(bool only_enumerate) noexcept {
+ _only_enumerate_docs = only_enumerate;
+ }
+ [[nodiscard]] bool only_enumerate_docs() const noexcept {
+ return _only_enumerate_docs;
+ }
+ void set_explicit_remove_set(std::vector<spi::IdAndTimestamp> remove_set) {
+ _explicit_remove_set = std::move(remove_set);
+ }
+ const std::vector<spi::IdAndTimestamp>& explicit_remove_set() const noexcept {
+ return _explicit_remove_set;
+ }
+ std::vector<spi::IdAndTimestamp> steal_explicit_remove_set() const noexcept {
+ return std::move(_explicit_remove_set);
+ }
+
DECLARE_STORAGECOMMAND(RemoveLocationCommand, onRemoveLocation);
private:
+ // TODO make variant? Only one of the two may be used
vespalib::string _documentSelection;
+ std::vector<spi::IdAndTimestamp> _explicit_remove_set;
+ bool _only_enumerate_docs;
};
class RemoveLocationReply : public BucketInfoReply
{
+ std::vector<spi::IdAndTimestamp> _selection_matches; // For use in 1st phase GC
uint32_t _documents_removed;
public:
explicit RemoveLocationReply(const RemoveLocationCommand& cmd, uint32_t docs_removed = 0);
@@ -29,6 +51,16 @@ public:
_documents_removed = docs_removed;
}
uint32_t documents_removed() const noexcept { return _documents_removed; }
+ // TODO refactor
+ void set_selection_matches(std::vector<spi::IdAndTimestamp> matches) noexcept {
+ _selection_matches = std::move(matches);
+ }
+ const std::vector<spi::IdAndTimestamp>& selection_matches() const noexcept {
+ return _selection_matches;
+ }
+ std::vector<spi::IdAndTimestamp> steal_selection_matches() noexcept {
+ return std::move(_selection_matches);
+ }
DECLARE_STORAGEREPLY(RemoveLocationReply, onRemoveLocationReply)
};