summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-12-02 13:27:11 +0100
committerGitHub <noreply@github.com>2020-12-02 13:27:11 +0100
commitb984156c6295acb407b577ffab0055bf767fe892 (patch)
tree7525656158724f496a78b9dfa817f45abb338508 /storage
parent512f4e8371f2fbdfba121148e2fe77003cbdf7ed (diff)
parent4f4bbe5490124bab25679c6c6ac92f3ee81ba74b (diff)
Merge pull request #15572 from vespa-engine/vekterli/initial-backend-reindexing-visitor-support
Initial support for backend reindexing visitor functionality
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/CMakeLists.txt1
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp77
-rw-r--r--storage/src/tests/distributor/operation_sequencer_test.cpp78
-rw-r--r--storage/src/tests/distributor/pendingmessagetrackertest.cpp69
-rw-r--r--storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp124
-rw-r--r--storage/src/vespa/storage/common/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/common/reindexing_constants.cpp12
-rw-r--r--storage/src/vespa/storage/common/reindexing_constants.h8
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp77
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.h30
-rw-r--r--storage/src/vespa/storage/distributor/operation_sequencer.cpp47
-rw-r--r--storage/src/vespa/storage/distributor/operation_sequencer.h80
-rw-r--r--storage/src/vespa/storage/distributor/operationowner.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp74
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h48
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp84
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/visitoroperation.h32
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.cpp77
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.h81
-rw-r--r--storage/src/vespa/storage/visiting/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/visiting/dumpvisitorsingle.h3
-rw-r--r--storage/src/vespa/storage/visiting/reindexing_visitor.cpp36
-rw-r--r--storage/src/vespa/storage/visiting/reindexing_visitor.h36
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.cpp2
26 files changed, 941 insertions, 146 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt
index 96df1a842a4..810ffb550bf 100644
--- a/storage/src/tests/distributor/CMakeLists.txt
+++ b/storage/src/tests/distributor/CMakeLists.txt
@@ -32,6 +32,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST
pendingmessagetrackertest.cpp
persistence_metrics_set_test.cpp
putoperationtest.cpp
+ read_for_write_visitor_operation_test.cpp
removebucketoperationtest.cpp
removelocationtest.cpp
removeoperationtest.cpp
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index e628b632215..40516622b65 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -6,7 +6,10 @@
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/distributormetricsset.h>
#include <vespa/storage/distributor/operations/external/getoperation.h>
+#include <vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h>
+#include <vespa/storage/common/reindexing_constants.h>
#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/message/visitor.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/update/documentupdate.h>
#include <vespa/document/fieldset/fieldsets.h>
@@ -561,6 +564,80 @@ TEST_F(ExternalOperationHandlerTest, gets_are_sent_with_weak_consistency_if_conf
do_test_get_weak_consistency_is_propagated(true);
}
+struct OperationHandlerSequencingTest : ExternalOperationHandlerTest {
+ void SetUp() override {
+ set_up_distributor_for_sequencing_test();
+ }
+
+ static documentapi::TestAndSetCondition bucket_lock_bypass_tas_condition() {
+ return documentapi::TestAndSetCondition(reindexing_bucket_lock_bypass_value());
+ }
+};
+
+TEST_F(OperationHandlerSequencingTest, put_not_allowed_through_locked_bucket_if_special_tas_token_not_present) {
+ auto put = makePutCommand("testdoctype1", "id:foo:testdoctype1:n=1:bar");
+ auto bucket = makeDocumentBucket(document::BucketId(16, 1));
+ auto bucket_handle = getExternalOperationHandler().operation_sequencer().try_acquire(bucket);
+ ASSERT_TRUE(bucket_handle.valid());
+ ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(put));
+}
+
+TEST_F(OperationHandlerSequencingTest, put_allowed_through_locked_bucket_if_special_tas_token_present) {
+ set_up_distributor_for_sequencing_test();
+
+ auto put = makePutCommand("testdoctype1", "id:foo:testdoctype1:n=1:bar");
+ put->setCondition(bucket_lock_bypass_tas_condition());
+
+ auto bucket = makeDocumentBucket(document::BucketId(16, 1));
+ auto bucket_handle = getExternalOperationHandler().operation_sequencer().try_acquire(bucket);
+ ASSERT_TRUE(bucket_handle.valid());
+
+ Operation::SP op;
+ ASSERT_NO_FATAL_FAILURE(start_operation_verify_not_rejected(put, op));
+}
+
+TEST_F(OperationHandlerSequencingTest, put_with_bucket_lock_tas_token_is_rejected_if_no_bucket_lock_present) {
+ auto put = makePutCommand("testdoctype1", "id:foo:testdoctype1:n=1:bar");
+ put->setCondition(bucket_lock_bypass_tas_condition());
+ ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(put));
+ // TODO determine most appropriate error code here. Want to fail the bucket but
+ // not the entire visitor operation. Will likely need to be revisited (heh!) soon.
+ EXPECT_EQ("ReturnCode(REJECTED, Operation expects a read-for-write bucket lock to be present, "
+ "but none currently exists)",
+ _sender.reply(0)->getResult().toString());
+}
+
+// This test is a variation of the above, but whereas it tests the case where _no_ lock is
+// present, this tests the case where a lock is present but it's not a bucket-level lock.
+TEST_F(OperationHandlerSequencingTest, put_with_bucket_lock_tas_token_is_rejected_if_document_lock_present) {
+ auto put = makePutCommand("testdoctype1", _dummy_id);
+ put->setCondition(bucket_lock_bypass_tas_condition());
+ Operation::SP op;
+ ASSERT_NO_FATAL_FAILURE(start_operation_verify_not_rejected(makeUpdateCommand("testdoctype1", _dummy_id), op));
+ ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(std::move(put)));
+ EXPECT_EQ("ReturnCode(REJECTED, Operation expects a read-for-write bucket lock to be present, "
+ "but none currently exists)",
+ _sender.reply(0)->getResult().toString());
+}
+
+TEST_F(OperationHandlerSequencingTest, reindexing_visitor_creates_read_for_write_operation) {
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(
+ document::FixedBucketSpaces::default_space(), "reindexingvisitor", "foo", "");
+ Operation::SP op;
+ getExternalOperationHandler().handleMessage(cmd, op);
+ ASSERT_TRUE(op.get() != nullptr);
+ ASSERT_TRUE(dynamic_cast<ReadForWriteVisitorOperationStarter*>(op.get()) != nullptr);
+}
+
+TEST_F(OperationHandlerSequencingTest, reindexing_visitor_library_check_is_case_insensitive) {
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(
+ document::FixedBucketSpaces::default_space(), "ReIndexingVisitor", "foo", "");
+ Operation::SP op;
+ getExternalOperationHandler().handleMessage(cmd, op);
+ ASSERT_TRUE(op.get() != nullptr);
+ ASSERT_TRUE(dynamic_cast<ReadForWriteVisitorOperationStarter*>(op.get()) != nullptr);
+}
+
// TODO support sequencing of RemoveLocation? It's a mutating operation, but supporting it with
// the current approach is not trivial. A RemoveLocation operation covers the _entire_ bucket
// sub tree under a given location, while the sequencer works on individual GIDs. Mapping the
diff --git a/storage/src/tests/distributor/operation_sequencer_test.cpp b/storage/src/tests/distributor/operation_sequencer_test.cpp
index b3674ee2126..37651be05a4 100644
--- a/storage/src/tests/distributor/operation_sequencer_test.cpp
+++ b/storage/src/tests/distributor/operation_sequencer_test.cpp
@@ -1,46 +1,94 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/document/base/documentid.h>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/storage/distributor/operation_sequencer.h>
#include <vespa/vespalib/gtest/gtest.h>
namespace storage::distributor {
using document::DocumentId;
+using namespace ::testing;
-TEST(OperationSequencerTest, can_get_sequencing_handle_for_id_without_existing_handle) {
+namespace {
+
+constexpr document::BucketSpace default_space() {
+ return document::FixedBucketSpaces::default_space();
+}
+
+constexpr document::BucketSpace global_space() {
+ return document::FixedBucketSpaces::global_space();
+}
+
+}
+
+struct OperationSequencerTest : Test {
OperationSequencer sequencer;
- auto handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd"));
+};
+
+TEST_F(OperationSequencerTest, can_get_sequencing_handle_for_id_without_existing_handle) {
+ auto handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test::abcd"));
EXPECT_TRUE(handle.valid());
+ EXPECT_FALSE(handle.is_blocked());
}
-TEST(OperationSequencerTest, cannot_get_sequencing_handle_for_id_with_existing_handle) {
- OperationSequencer sequencer;
- auto first_handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd"));
- auto second_handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd"));
+TEST_F(OperationSequencerTest, cannot_get_sequencing_handle_for_id_with_existing_handle) {
+ auto first_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test::abcd"));
+ auto second_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test::abcd"));
EXPECT_FALSE(second_handle.valid());
+ ASSERT_TRUE(second_handle.is_blocked());
+ EXPECT_EQ(second_handle.blocked_by(), SequencingHandle::BlockedBy::PendingOperation);
}
-TEST(OperationSequencerTest, can_get_sequencing_handle_for_different_ids) {
- OperationSequencer sequencer;
- auto first_handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd"));
- auto second_handle = sequencer.try_acquire(DocumentId("id:foo:test::efgh"));
+TEST_F(OperationSequencerTest, can_get_sequencing_handle_for_different_ids) {
+ auto first_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test::abcd"));
+ auto second_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test::efgh"));
EXPECT_TRUE(first_handle.valid());
EXPECT_TRUE(second_handle.valid());
}
-TEST(OperationSequencerTest, releasing_handle_allows_for_getting_new_handles_for_id) {
- OperationSequencer sequencer;
- auto first_handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd"));
+TEST_F(OperationSequencerTest, releasing_handle_allows_for_getting_new_handles_for_id) {
+ auto first_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test::abcd"));
// Explicit release
first_handle.release();
{
- auto second_handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd"));
+ auto second_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test::abcd"));
EXPECT_TRUE(second_handle.valid());
// Implicit release by scope exit
}
- auto third_handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd"));
+ auto third_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test::abcd"));
EXPECT_TRUE(third_handle.valid());
}
+TEST_F(OperationSequencerTest, cannot_get_handle_for_gid_contained_in_locked_bucket) {
+ auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)));
+ EXPECT_TRUE(bucket_handle.valid());
+ auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=1:abcd"));
+ EXPECT_FALSE(doc_handle.valid());
+ ASSERT_TRUE(doc_handle.is_blocked());
+ EXPECT_EQ(doc_handle.blocked_by(), SequencingHandle::BlockedBy::LockedBucket);
+}
+
+TEST_F(OperationSequencerTest, can_get_handle_for_gid_not_contained_in_active_bucket) {
+ auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)));
+ EXPECT_TRUE(bucket_handle.valid());
+ // Note: different sub-bucket than the lock
+ auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=2:abcd"));
+ EXPECT_TRUE(doc_handle.valid());
+}
+
+TEST_F(OperationSequencerTest, releasing_bucket_lock_allows_gid_handles_to_be_acquired) {
+ auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)));
+ bucket_handle.release();
+ auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=1:abcd"));
+ EXPECT_TRUE(doc_handle.valid());
+}
+
+TEST_F(OperationSequencerTest, can_get_handle_for_gid_when_locked_bucket_is_in_separate_bucket_space) {
+ auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)));
+ EXPECT_TRUE(bucket_handle.valid());
+ auto doc_handle = sequencer.try_acquire(global_space(), DocumentId("id:foo:test:n=1:abcd"));
+ EXPECT_TRUE(doc_handle.valid());
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
index 90171db97d1..71b51c9a7b6 100644
--- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp
+++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
@@ -1,13 +1,13 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/document/base/testdocman.h>
+#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
-#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h>
#include <tests/common/dummystoragelink.h>
-#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <gmock/gmock.h>
@@ -83,6 +83,16 @@ public:
_tracker->reply(*putReply);
}
+ std::shared_ptr<api::PutCommand> createPutToNode(uint16_t node) const {
+ document::BucketId bucket(16, 1234);
+ auto cmd = std::make_shared<api::PutCommand>(
+ makeDocumentBucket(bucket),
+ createDummyDocumentForBucket(bucket),
+ api::Timestamp(123456));
+ cmd->setAddress(makeStorageAddress(node));
+ return cmd;
+ }
+
PendingMessageTracker& tracker() { return *_tracker; }
auto& clock() { return _clock; }
@@ -98,16 +108,6 @@ private:
return _testDocMan.createDocument("foobar", createDummyIdString(bucket));
}
- std::shared_ptr<api::PutCommand> createPutToNode(uint16_t node) const {
- document::BucketId bucket(16, 1234);
- auto cmd = std::make_shared<api::PutCommand>(
- makeDocumentBucket(bucket),
- createDummyDocumentForBucket(bucket),
- api::Timestamp(123456));
- cmd->setAddress(makeStorageAddress(node));
- return cmd;
- }
-
std::shared_ptr<api::RemoveCommand> createRemoveToNode(
uint16_t node) const
{
@@ -436,4 +436,49 @@ TEST_F(PendingMessageTrackerTest, busy_node_duration_can_be_adjusted) {
EXPECT_FALSE(f.tracker().getNodeInfo().isBusy(0));
}
+namespace {
+
+document::BucketId bucket_of(const document::DocumentId& id) {
+ return document::BucketId(16, id.getGlobalId().convertToBucketId().getId());
+}
+
+}
+
+TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_no_pending_ops) {
+ Fixture f;
+ auto cmd = f.createPutToNode(0);
+ auto bucket_id = bucket_of(cmd->getDocumentId());
+ auto state = TaskRunState::Aborted;
+ f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){
+ state = s;
+ }));
+ EXPECT_EQ(state, TaskRunState::OK);
+}
+
+TEST_F(PendingMessageTrackerTest, deferred_task_not_started_before_pending_ops_completed) {
+ Fixture f;
+ auto cmd = f.sendPut(RequestBuilder().toNode(0));
+ auto bucket_id = bucket_of(cmd->getDocumentId());
+ auto state = TaskRunState::Aborted;
+ f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){
+ state = s;
+ }));
+ EXPECT_EQ(state, TaskRunState::Aborted);
+ f.sendPutReply(*cmd, RequestBuilder()); // Deferred task should be run as part of this.
+ EXPECT_EQ(state, TaskRunState::OK);
+}
+
+TEST_F(PendingMessageTrackerTest, abort_invokes_deferred_tasks_with_aborted_status) {
+ Fixture f;
+ auto cmd = f.sendPut(RequestBuilder().toNode(0));
+ auto bucket_id = bucket_of(cmd->getDocumentId());
+ auto state = TaskRunState::OK;
+ f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){
+ state = s;
+ }));
+ EXPECT_EQ(state, TaskRunState::OK);
+ f.tracker().abort_deferred_tasks();
+ EXPECT_EQ(state, TaskRunState::Aborted);
+}
+
}
diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
new file mode 100644
index 00000000000..76112b1c729
--- /dev/null
+++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
@@ -0,0 +1,124 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/document/base/testdocman.h>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h>
+#include <vespa/storage/distributor/operations/external/visitoroperation.h>
+#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributormetricsset.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/message/visitor.h>
+#include <tests/distributor/distributortestutil.h>
+#include <vespa/vespalib/gtest/gtest.h>
+
+using namespace ::testing;
+using document::Bucket;
+using document::BucketId;
+
+namespace storage::distributor {
+
+struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil {
+ document::TestDocMan _test_doc_man;
+ VisitorOperation::Config _default_config;
+ std::unique_ptr<OperationOwner> _op_owner;
+ BucketId _superbucket;
+ BucketId _sub_bucket;
+
+ ReadForWriteVisitorOperationStarterTest()
+ : _test_doc_man(),
+ _default_config(100, 100),
+ _op_owner(),
+ _superbucket(16, 4),
+ _sub_bucket(17, 4)
+ {}
+
+ void SetUp() override {
+ createLinks();
+ setupDistributor(1, 1, "version:1 distributor:1 storage:1");
+ _op_owner = std::make_unique<OperationOwner>(_sender, getClock());
+
+ addNodesToBucketDB(_sub_bucket, "0=1/2/3/t");
+ }
+
+ void TearDown() override {
+ close();
+ }
+
+ static Bucket default_bucket(BucketId id) {
+ return Bucket(document::FixedBucketSpaces::default_space(), id);
+ }
+
+ std::shared_ptr<VisitorOperation> create_nested_visitor_op(bool valid_command = true) {
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(
+ document::FixedBucketSpaces::default_space(), "reindexingvisitor", "foo", "");
+ if (valid_command) {
+ cmd->addBucketToBeVisited(_superbucket);
+ cmd->addBucketToBeVisited(BucketId()); // Will be inferred to first sub-bucket in DB
+ }
+ return std::make_shared<VisitorOperation>(
+ getExternalOperationHandler(), getExternalOperationHandler(),
+ getDistributorBucketSpace(), cmd, _default_config,
+ getDistributor().getMetrics().visits);
+ }
+
+ OperationSequencer& operation_sequencer() {
+ return getExternalOperationHandler().operation_sequencer();
+ }
+
+ std::shared_ptr<ReadForWriteVisitorOperationStarter> create_rfw_op(std::shared_ptr<VisitorOperation> visitor_op) {
+ return std::make_shared<ReadForWriteVisitorOperationStarter>(
+ std::move(visitor_op), operation_sequencer(),
+ *_op_owner, getDistributor().getPendingMessageTracker());
+ }
+};
+
+TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_that_fails_precondition_checks_is_immediately_failed) {
+ auto op = create_rfw_op(create_nested_visitor_op(false));
+ _op_owner->start(op, OperationStarter::Priority(120));
+ EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) "
+ "ReturnCode(ILLEGAL_PARAMETERS, No buckets in CreateVisitorCommand for visitor 'foo')",
+ _sender.getLastReply());
+}
+
+TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_immediately_started_if_no_pending_ops_to_bucket) {
+ auto op = create_rfw_op(create_nested_visitor_op(true));
+ _op_owner->start(op, OperationStarter::Priority(120));
+ ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
+}
+
+TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_start_deferred_if_pending_ops_to_bucket) {
+ auto op = create_rfw_op(create_nested_visitor_op(true));
+ // Pending mutating op to same bucket, prevents visitor from starting
+ auto update = std::make_shared<document::DocumentUpdate>(
+ _test_doc_man.getTypeRepo(),
+ *_test_doc_man.getTypeRepo().getDocumentType("testdoctype1"),
+ document::DocumentId("id::testdoctype1:n=4:foo"));
+ auto update_cmd = std::make_shared<api::UpdateCommand>(
+ default_bucket(document::BucketId(0)), std::move(update), api::Timestamp(0));
+
+ Operation::SP mutating_op;
+ getExternalOperationHandler().handleMessage(update_cmd, mutating_op);
+ ASSERT_TRUE(mutating_op);
+ _op_owner->start(mutating_op, OperationStarter::Priority(120));
+ ASSERT_EQ("Update(BucketId(0x4400000000000004), id::testdoctype1:n=4:foo, timestamp 1) => 0",
+ _sender.getCommands(true, true));
+ // Since pending message tracking normally happens in the distributor itself during sendUp,
+ // we have to emulate this and explicitly insert the sent message into the pending mapping.
+ getDistributor().getPendingMessageTracker().insert(_sender.command(0));
+
+ _op_owner->start(op, OperationStarter::Priority(120));
+ // Nothing started yet
+ ASSERT_EQ("", _sender.getCommands(true, false, 1));
+
+ // Pretend update operation completed
+ auto update_reply = std::shared_ptr<api::StorageReply>(_sender.command(0)->makeReply());
+ getDistributor().getPendingMessageTracker().reply(*update_reply);
+ _op_owner->handleReply(update_reply);
+
+ // Visitor should now be started!
+ ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true, false, 1));
+}
+
+}
diff --git a/storage/src/vespa/storage/common/CMakeLists.txt b/storage/src/vespa/storage/common/CMakeLists.txt
index 81c6486eaeb..29130515d2a 100644
--- a/storage/src/vespa/storage/common/CMakeLists.txt
+++ b/storage/src/vespa/storage/common/CMakeLists.txt
@@ -9,6 +9,7 @@ vespa_add_library(storage_common OBJECT
global_bucket_space_distribution_converter.cpp
messagebucket.cpp
messagesender.cpp
+ reindexing_constants.cpp
servicelayercomponent.cpp
statusmessages.cpp
statusmetricconsumer.cpp
diff --git a/storage/src/vespa/storage/common/reindexing_constants.cpp b/storage/src/vespa/storage/common/reindexing_constants.cpp
new file mode 100644
index 00000000000..c8330a43ffb
--- /dev/null
+++ b/storage/src/vespa/storage/common/reindexing_constants.cpp
@@ -0,0 +1,12 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "reindexing_constants.h"
+
+namespace storage {
+
+const char* reindexing_bucket_lock_bypass_value() noexcept {
+ // This is by design a string that will fail to parse as a valid document selection in the backend.
+ // It's only used to bypass the read-for-write visitor bucket lock.
+ return "@@__vespa_internal_allow_through_bucket_lock";
+}
+
+}
diff --git a/storage/src/vespa/storage/common/reindexing_constants.h b/storage/src/vespa/storage/common/reindexing_constants.h
new file mode 100644
index 00000000000..be7d47b503c
--- /dev/null
+++ b/storage/src/vespa/storage/common/reindexing_constants.h
@@ -0,0 +1,8 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+namespace storage {
+
+const char* reindexing_bucket_lock_bypass_value() noexcept;
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 1252e7388cf..0ad8118d925 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -83,7 +83,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_distributorStatusDelegate(compReg, *this, *this),
_bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater),
_idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, manageActiveBucketCopies),
- _externalOperationHandler(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, _idealStateManager, compReg),
+ _externalOperationHandler(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo,
+ _idealStateManager, _operationOwner, compReg),
_threadPool(threadPool),
_initializingIsUp(true),
_doneInitializeHandler(doneInitHandler),
@@ -211,6 +212,7 @@ void Distributor::onClose() {
}
LOG(debug, "Distributor::onClose invoked");
+ _pendingMessageTracker.abort_deferred_tasks();
_bucketDBUpdater.flush();
_externalOperationHandler.close_pending();
_operationOwner.onClose();
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index deb5fff8204..b9eaebb0bd1 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -11,8 +11,10 @@
#include <vespa/storage/distributor/operations/external/getoperation.h>
#include <vespa/storage/distributor/operations/external/statbucketoperation.h>
#include <vespa/storage/distributor/operations/external/statbucketlistoperation.h>
+#include <vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h>
#include <vespa/storage/distributor/operations/external/removelocationoperation.h>
#include <vespa/storage/distributor/operations/external/visitoroperation.h>
+#include <vespa/storage/common/reindexing_constants.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/storageapi/message/stat.h>
@@ -55,11 +57,13 @@ ExternalOperationHandler::ExternalOperationHandler(Distributor& owner,
DistributorBucketSpaceRepo& bucketSpaceRepo,
DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
const MaintenanceOperationGenerator& gen,
+ OperationOwner& operation_owner,
DistributorComponentRegister& compReg)
: DistributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "External operation handler"),
_direct_dispatch_sender(std::make_unique<DirectDispatchSender>(owner)),
_operationGenerator(gen),
_rejectFeedBeforeTimeReached(), // At epoch
+ _distributor_operation_owner(operation_owner),
_non_main_thread_ops_mutex(),
_non_main_thread_ops_owner(*_direct_dispatch_sender, getClock()),
_concurrent_gets_enabled(false),
@@ -241,8 +245,16 @@ void ExternalOperationHandler::bounce_or_invoke_read_only_op(
}
}
-IMPL_MSG_COMMAND_H(ExternalOperationHandler, Put)
-{
+namespace {
+
+bool put_is_allowed_through_bucket_lock(const api::PutCommand& cmd) {
+ const auto& tas_cond = cmd.getCondition();
+ return (tas_cond.isPresent() && (tas_cond.getSelection() == reindexing_bucket_lock_bypass_value()));
+}
+
+}
+
+bool ExternalOperationHandler::onPut(const std::shared_ptr<api::PutCommand>& cmd) {
auto& metrics = getMetrics().puts;
if (!checkTimestampMutationPreconditions(*cmd, getBucketId(cmd->getDocumentId()), metrics)) {
return true;
@@ -252,11 +264,23 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Put)
cmd->setTimestamp(getUniqueTimestamp());
}
- auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId());
- if (allowMutation(handle)) {
- document::BucketSpace bucketSpace = cmd->getBucket().getBucketSpace();
+ const auto bucket_space = cmd->getBucket().getBucketSpace();
+ auto handle = _operation_sequencer.try_acquire(bucket_space, cmd->getDocumentId());
+ bool allow = allowMutation(handle);
+ if (put_is_allowed_through_bucket_lock(*cmd)) {
+ if (!allow && handle.is_blocked_by(SequencingHandle::BlockedBy::LockedBucket)) {
+ cmd->setCondition(documentapi::TestAndSetCondition()); // Must clear TaS or the backend will reject the op
+ allow = true;
+ } else {
+ bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::REJECTED,
+ "Operation expects a read-for-write bucket lock to be present, "
+ "but none currently exists"));
+ return true;
+ }
+ }
+ if (allow) {
_op = std::make_shared<PutOperation>(*this, *this,
- _bucketSpaceRepo.get(bucketSpace),
+ _bucketSpaceRepo.get(bucket_space),
std::move(cmd), getMetrics().puts, std::move(handle));
} else {
sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics));
@@ -266,8 +290,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Put)
}
-IMPL_MSG_COMMAND_H(ExternalOperationHandler, Update)
-{
+bool ExternalOperationHandler::onUpdate(const std::shared_ptr<api::UpdateCommand>& cmd) {
auto& metrics = getMetrics().updates;
if (!checkTimestampMutationPreconditions(*cmd, getBucketId(cmd->getDocumentId()), metrics)) {
return true;
@@ -276,11 +299,11 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Update)
if (cmd->getTimestamp() == 0) {
cmd->setTimestamp(getUniqueTimestamp());
}
- auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId());
+ const auto bucket_space = cmd->getBucket().getBucketSpace();
+ auto handle = _operation_sequencer.try_acquire(bucket_space, cmd->getDocumentId());
if (allowMutation(handle)) {
- document::BucketSpace bucketSpace = cmd->getBucket().getBucketSpace();
_op = std::make_shared<TwoPhaseUpdateOperation>(*this, *this, *this,
- _bucketSpaceRepo.get(bucketSpace),
+ _bucketSpaceRepo.get(bucket_space),
std::move(cmd), getMetrics(), std::move(handle));
} else {
sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics));
@@ -290,8 +313,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Update)
}
-IMPL_MSG_COMMAND_H(ExternalOperationHandler, Remove)
-{
+bool ExternalOperationHandler::onRemove(const std::shared_ptr<api::RemoveCommand>& cmd) {
auto& metrics = getMetrics().removes;
if (!checkTimestampMutationPreconditions(*cmd, getBucketId(cmd->getDocumentId()), metrics)) {
return true;
@@ -300,9 +322,10 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Remove)
if (cmd->getTimestamp() == 0) {
cmd->setTimestamp(getUniqueTimestamp());
}
- auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId());
+ const auto bucket_space = cmd->getBucket().getBucketSpace();
+ auto handle = _operation_sequencer.try_acquire(bucket_space, cmd->getDocumentId());
if (allowMutation(handle)) {
- auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()));
+ auto &distributorBucketSpace(_bucketSpaceRepo.get(bucket_space));
_op = std::make_shared<RemoveOperation>(*this, *this, distributorBucketSpace, std::move(cmd),
getMetrics().removes, std::move(handle));
@@ -313,8 +336,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Remove)
return true;
}
-IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation)
-{
+bool ExternalOperationHandler::onRemoveLocation(const std::shared_ptr<api::RemoveLocationCommand>& cmd) {
document::BucketId bid;
RemoveLocationOperation::getBucketId(*this, *this, *cmd, bid);
document::Bucket bucket(cmd->getBucket().getBucketSpace(), bid);
@@ -359,14 +381,12 @@ std::shared_ptr<Operation> ExternalOperationHandler::try_generate_get_operation(
desired_get_read_consistency());
}
-IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get)
-{
+bool ExternalOperationHandler::onGet(const std::shared_ptr<api::GetCommand>& cmd) {
_op = try_generate_get_operation(cmd);
return true;
}
-IMPL_MSG_COMMAND_H(ExternalOperationHandler, StatBucket)
-{
+bool ExternalOperationHandler::onStatBucket(const std::shared_ptr<api::StatBucketCommand>& cmd) {
auto& metrics = getMetrics().stats;
bounce_or_invoke_read_only_op(*cmd, cmd->getBucket(), metrics, [&](auto& bucket_space_repo) {
auto& bucket_space = bucket_space_repo.get(cmd->getBucket().getBucketSpace());
@@ -375,8 +395,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, StatBucket)
return true;
}
-IMPL_MSG_COMMAND_H(ExternalOperationHandler, GetBucketList)
-{
+bool ExternalOperationHandler::onGetBucketList(const std::shared_ptr<api::GetBucketListCommand>& cmd) {
auto& metrics = getMetrics().getbucketlists;
bounce_or_invoke_read_only_op(*cmd, cmd->getBucket(), metrics, [&](auto& bucket_space_repo) {
auto& bucket_space = bucket_space_repo.get(cmd->getBucket().getBucketSpace());
@@ -386,13 +405,19 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, GetBucketList)
return true;
}
-IMPL_MSG_COMMAND_H(ExternalOperationHandler, CreateVisitor)
-{
+bool ExternalOperationHandler::onCreateVisitor(const std::shared_ptr<api::CreateVisitorCommand>& cmd) {
// TODO same handling as Gets (VisitorOperation needs to change)
const DistributorConfiguration& config(getDistributor().getConfig());
VisitorOperation::Config visitorConfig(config.getMinBucketsPerVisitor(), config.getMaxVisitorsPerNodePerClientVisitor());
auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()));
- _op = Operation::SP(new VisitorOperation(*this, *this, distributorBucketSpace, cmd, visitorConfig, getMetrics().visits));
+ auto visit_op = std::make_shared<VisitorOperation>(*this, *this, distributorBucketSpace, cmd, visitorConfig, getMetrics().visits);
+ if (visit_op->is_read_for_write()) {
+ _op = std::make_shared<ReadForWriteVisitorOperationStarter>(std::move(visit_op), _operation_sequencer,
+ _distributor_operation_owner,
+ getDistributor().getPendingMessageTracker());
+ } else {
+ _op = std::move(visit_op);
+ }
return true;
}
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h
index 60cad15a791..00dc9a49d74 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.h
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h
@@ -11,6 +11,8 @@
#include <chrono>
#include <mutex>
+namespace documentapi { class TestAndSetCondition; }
+
namespace storage {
class PersistenceOperationMetricSet;
@@ -21,6 +23,7 @@ class DistributorMetricSet;
class Distributor;
class MaintenanceOperationGenerator;
class DirectDispatchSender;
+class OperationOwner;
class ExternalOperationHandler : public DistributorComponent,
public api::MessageHandler
@@ -29,19 +32,20 @@ public:
using Clock = std::chrono::system_clock;
using TimePoint = std::chrono::time_point<Clock>;
- DEF_MSG_COMMAND_H(Get);
- DEF_MSG_COMMAND_H(Put);
- DEF_MSG_COMMAND_H(Update);
- DEF_MSG_COMMAND_H(Remove);
- DEF_MSG_COMMAND_H(RemoveLocation);
- DEF_MSG_COMMAND_H(StatBucket);
- DEF_MSG_COMMAND_H(CreateVisitor);
- DEF_MSG_COMMAND_H(GetBucketList);
+ bool onGet(const std::shared_ptr<api::GetCommand>&) override;
+ bool onPut(const std::shared_ptr<api::PutCommand>&) override;
+ bool onUpdate(const std::shared_ptr<api::UpdateCommand>&) override;
+ bool onRemove(const std::shared_ptr<api::RemoveCommand>&) override;
+ bool onRemoveLocation(const std::shared_ptr<api::RemoveLocationCommand>&) override;
+ bool onStatBucket(const std::shared_ptr<api::StatBucketCommand>&) override;
+ bool onCreateVisitor(const std::shared_ptr<api::CreateVisitorCommand>&) override;
+ bool onGetBucketList(const std::shared_ptr<api::GetBucketListCommand>&) override;
ExternalOperationHandler(Distributor& owner,
DistributorBucketSpaceRepo& bucketSpaceRepo,
DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
- const MaintenanceOperationGenerator&,
+ const MaintenanceOperationGenerator& gen,
+ OperationOwner& operation_owner,
DistributorComponentRegister& compReg);
~ExternalOperationHandler() override;
@@ -74,12 +78,18 @@ public:
return _use_weak_internal_read_consistency_for_gets.load(std::memory_order_relaxed);
}
+ // Exposed for testing
+ OperationSequencer& operation_sequencer() noexcept {
+ return _operation_sequencer;
+ }
+
private:
std::unique_ptr<DirectDispatchSender> _direct_dispatch_sender;
const MaintenanceOperationGenerator& _operationGenerator;
- OperationSequencer _mutationSequencer;
+ OperationSequencer _operation_sequencer;
Operation::SP _op;
TimePoint _rejectFeedBeforeTimeReached;
+ OperationOwner& _distributor_operation_owner;
mutable std::mutex _non_main_thread_ops_mutex;
OperationOwner _non_main_thread_ops_owner;
std::atomic<bool> _concurrent_gets_enabled;
diff --git a/storage/src/vespa/storage/distributor/operation_sequencer.cpp b/storage/src/vespa/storage/distributor/operation_sequencer.cpp
index d268028de78..aa48cb6c950 100644
--- a/storage/src/vespa/storage/distributor/operation_sequencer.cpp
+++ b/storage/src/vespa/storage/distributor/operation_sequencer.cpp
@@ -4,8 +4,7 @@
#include <vespa/document/base/documentid.h>
#include <cassert>
-namespace storage {
-namespace distributor {
+namespace storage::distributor {
void SequencingHandle::release() {
if (valid()) {
@@ -14,27 +13,49 @@ void SequencingHandle::release() {
}
}
-OperationSequencer::OperationSequencer() {
-}
-
-OperationSequencer::~OperationSequencer() {
-}
+OperationSequencer::OperationSequencer() = default;
+OperationSequencer::~OperationSequencer() = default;
-SequencingHandle OperationSequencer::try_acquire(const document::DocumentId& id) {
+SequencingHandle OperationSequencer::try_acquire(document::BucketSpace bucket_space, const document::DocumentId& id) {
const document::GlobalId gid(id.getGlobalId());
+ if (!_active_buckets.empty()) {
+ auto doc_bucket_id = gid.convertToBucketId();
+ // TODO avoid O(n), but sub bucket resolving is tricky and we expect the number
+ // of locked buckets to be in the range of 0 to <very small number>.
+ for (const auto& entry : _active_buckets) {
+ if ((entry.getBucketSpace() == bucket_space)
+ && entry.getBucketId().contains(doc_bucket_id))
+ {
+ return SequencingHandle(SequencingHandle::BlockedBy::LockedBucket);
+ }
+ }
+ }
const auto inserted = _active_gids.insert(gid);
if (inserted.second) {
return SequencingHandle(*this, gid);
} else {
- return SequencingHandle();
+ return SequencingHandle(SequencingHandle::BlockedBy::PendingOperation);
+ }
+}
+
+SequencingHandle OperationSequencer::try_acquire(const document::Bucket& bucket) {
+ const auto inserted = _active_buckets.insert(bucket);
+ if (inserted.second) {
+ return SequencingHandle(*this, bucket);
+ } else {
+ return SequencingHandle(SequencingHandle::BlockedBy::LockedBucket);
}
}
void OperationSequencer::release(const SequencingHandle& handle) {
assert(handle.valid());
- _active_gids.erase(handle.gid());
+ if (handle.has_gid()) {
+ _active_gids.erase(handle.gid());
+ } else {
+ assert(handle.has_bucket());
+ [[maybe_unused]] auto erased = _active_buckets.erase(handle.bucket());
+ assert(erased == 1u);
+ }
}
-} // distributor
-} // storage
-
+} // storage::distributor
diff --git a/storage/src/vespa/storage/distributor/operation_sequencer.h b/storage/src/vespa/storage/distributor/operation_sequencer.h
index 7a523b61e53..b813071a1c8 100644
--- a/storage/src/vespa/storage/distributor/operation_sequencer.h
+++ b/storage/src/vespa/storage/distributor/operation_sequencer.h
@@ -2,8 +2,11 @@
#pragma once
#include <vespa/document/base/globalid.h>
+#include <vespa/document/bucket/bucket.h>
#include <vespa/vespalib/stllike/hash_set.h>
+#include <set>
#include <utility>
+#include <variant>
namespace document {
class DocumentId;
@@ -15,19 +18,42 @@ class OperationSequencer;
/**
* Represents a move-only handle which effectively holds a guard for
- * allowing sequenced operations towards a particular document ID.
+ * allowing sequenced operations towards a particular document ID or
+ * bucket ID.
*
* Destroying a handle will implicitly release the guard, allowing
* new sequenced operations towards the ID.
*/
class SequencingHandle {
+public:
+ enum class BlockedBy {
+ PendingOperation,
+ LockedBucket
+ };
+private:
OperationSequencer* _sequencer;
- document::GlobalId _gid;
+ using HandleVariant = std::variant<document::Bucket, document::GlobalId, BlockedBy>;
+ HandleVariant _handle;
public:
- SequencingHandle() noexcept : _sequencer(nullptr) {}
- SequencingHandle(OperationSequencer& sequencer, const document::GlobalId& gid)
- : _sequencer(&sequencer),
- _gid(gid)
+ SequencingHandle() noexcept
+ : _sequencer(nullptr),
+ _handle()
+ {}
+
+ explicit SequencingHandle(BlockedBy blocked_by)
+ : _sequencer(nullptr),
+ _handle(blocked_by)
+ {}
+
+ SequencingHandle(OperationSequencer& sequencer, const document::GlobalId& gid) noexcept
+ : _sequencer(&sequencer),
+ _handle(gid)
+ {
+ }
+
+ SequencingHandle(OperationSequencer& sequencer, const document::Bucket& bucket)
+ : _sequencer(&sequencer),
+ _handle(bucket)
{
}
@@ -39,8 +65,8 @@ public:
SequencingHandle& operator=(const SequencingHandle&) = delete;
SequencingHandle(SequencingHandle&& rhs) noexcept
- : _sequencer(rhs._sequencer),
- _gid(rhs._gid)
+ : _sequencer(rhs._sequencer),
+ _handle(std::move(rhs._handle))
{
rhs._sequencer = nullptr;
}
@@ -48,13 +74,33 @@ public:
SequencingHandle& operator=(SequencingHandle&& rhs) noexcept {
if (&rhs != this) {
std::swap(_sequencer, rhs._sequencer);
- std::swap(_gid, rhs._gid);
+ std::swap(_handle, rhs._handle);
}
return *this;
}
- bool valid() const noexcept { return (_sequencer != nullptr); }
- const document::GlobalId& gid() const noexcept { return _gid; }
+ [[nodiscard]] bool valid() const noexcept { return (_sequencer != nullptr); }
+ [[nodiscard]] bool is_blocked() const noexcept {
+ return std::holds_alternative<BlockedBy>(_handle);
+ }
+ [[nodiscard]] BlockedBy blocked_by() const noexcept {
+ return std::get<BlockedBy>(_handle); // FIXME can actually throw
+ }
+ [[nodiscard]] bool is_blocked_by(BlockedBy blocker) const noexcept {
+ return (is_blocked() && blocked_by() == blocker);
+ }
+ [[nodiscard]] bool has_bucket() const noexcept {
+ return std::holds_alternative<document::Bucket>(_handle);
+ }
+ const document::Bucket& bucket() const noexcept {
+ return std::get<document::Bucket>(_handle); // FIXME can actually throw
+ }
+ [[nodiscard]] bool has_gid() const noexcept {
+ return std::holds_alternative<document::GlobalId>(_handle);
+ }
+ const document::GlobalId& gid() const noexcept {
+ return std::get<document::GlobalId>(_handle); // FIXME can actually throw
+ }
void release();
};
@@ -68,7 +114,10 @@ public:
*/
class OperationSequencer {
using GidSet = vespalib::hash_set<document::GlobalId, document::GlobalId::hash>;
- GidSet _active_gids;
+ using BucketSet = std::set<document::Bucket>;
+
+ GidSet _active_gids;
+ BucketSet _active_buckets;
friend class SequencingHandle;
public:
@@ -76,8 +125,11 @@ public:
~OperationSequencer();
// Returns a handle with valid() == true iff no concurrent operations are
- // already active for `id`.
- SequencingHandle try_acquire(const document::DocumentId& id);
+ // already active for `id` _and_ the there are no active bucket locks for
+ // any bucket that may contain `id`.
+ SequencingHandle try_acquire(document::BucketSpace bucket_space, const document::DocumentId& id);
+
+ SequencingHandle try_acquire(const document::Bucket& bucket);
private:
void release(const SequencingHandle& handle);
};
diff --git a/storage/src/vespa/storage/distributor/operationowner.h b/storage/src/vespa/storage/distributor/operationowner.h
index 39881550466..37412b08d2e 100644
--- a/storage/src/vespa/storage/distributor/operationowner.h
+++ b/storage/src/vespa/storage/distributor/operationowner.h
@@ -59,7 +59,7 @@ public:
: _sender(sender),
_clock(clock) {
}
- ~OperationOwner();
+ ~OperationOwner() override;
/**
Handles replies from storage, mapping from a message id to an operation.
@@ -82,6 +82,8 @@ public:
*/
void erase(api::StorageMessage::Id msgId);
+ [[nodiscard]] DistributorMessageSender& sender() noexcept { return _sender; }
+
void onClose();
uint32_t size() const { return _sentMessageMap.size(); }
std::string toString() const;
diff --git a/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt b/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt
index bb2992034f7..57a5a6cdde5 100644
--- a/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt
@@ -4,6 +4,7 @@ vespa_add_library(storage_distributoroperationexternal OBJECT
getoperation.cpp
newest_replica.cpp
putoperation.cpp
+ read_for_write_visitor_operation.cpp
removelocationoperation.cpp
removeoperation.cpp
statbucketlistoperation.cpp
diff --git a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp
new file mode 100644
index 00000000000..1e87172e870
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp
@@ -0,0 +1,74 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "read_for_write_visitor_operation.h"
+#include "visitoroperation.h"
+#include <vespa/storage/distributor/pendingmessagetracker.h>
+#include <vespa/storage/distributor/operationowner.h>
+#include <cassert>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".operations.external.read_for_write_visitor_operation");
+
+namespace storage::distributor {
+
+ReadForWriteVisitorOperationStarter::ReadForWriteVisitorOperationStarter(
+ std::shared_ptr<VisitorOperation> visitor_op,
+ OperationSequencer& operation_sequencer,
+ OperationOwner& stable_operation_owner,
+ PendingMessageTracker& message_tracker)
+ : _visitor_op(std::move(visitor_op)),
+ _operation_sequencer(operation_sequencer),
+ _stable_operation_owner(stable_operation_owner),
+ _message_tracker(message_tracker)
+{
+}
+
+ReadForWriteVisitorOperationStarter::~ReadForWriteVisitorOperationStarter() = default;
+
+void ReadForWriteVisitorOperationStarter::onClose(DistributorMessageSender& sender) {
+ _visitor_op->onClose(sender);
+}
+
+void ReadForWriteVisitorOperationStarter::onStart(DistributorMessageSender& sender) {
+ if (_visitor_op->verify_command_and_expand_buckets(sender)) {
+ assert(!_visitor_op->has_sent_reply());
+ auto maybe_bucket = _visitor_op->first_bucket_to_visit();
+ if (!maybe_bucket) {
+ LOG(debug, "No buckets found to visit, tagging visitor complete");
+ // No buckets to be found, start op to trigger immediate reply.
+ _visitor_op->start(sender, _startTime);
+ assert(_visitor_op->has_sent_reply());
+ return;
+ }
+ auto bucket_handle = _operation_sequencer.try_acquire(*maybe_bucket);
+ if (!bucket_handle.valid()) {
+ LOG(debug, "An operation is already pending for bucket %s, failing visitor",
+ maybe_bucket->toString().c_str());
+ _visitor_op->fail_with_bucket_already_locked(sender);
+ return;
+ }
+ LOG(debug, "Possibly deferring start of visitor for bucket %s", maybe_bucket->toString().c_str());
+ _message_tracker.run_once_no_pending_for_bucket(
+ *maybe_bucket,
+ make_deferred_task([self = shared_from_this(), handle = std::move(bucket_handle)](TaskRunState state) mutable {
+ LOG(debug, "Starting deferred visitor");
+ self->_visitor_op->assign_bucket_lock_handle(std::move(handle));
+ if (state == TaskRunState::OK) {
+ // Once started, ownership of _visitor_op will pass to the Distributor's OperationOwner
+ self->_stable_operation_owner.start(self->_visitor_op, 120/*TODO*/);
+ } else {
+ self->_visitor_op->onClose(self->_stable_operation_owner.sender());
+ }
+ }));
+ } else {
+ LOG(debug, "Failed verification of visitor, responding immediately");
+ assert(_visitor_op->has_sent_reply());
+ }
+}
+
+void ReadForWriteVisitorOperationStarter::onReceive(DistributorMessageSender& sender,
+ const std::shared_ptr<api::StorageReply> & msg) {
+ _visitor_op->onReceive(sender, msg);
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h
new file mode 100644
index 00000000000..06b4f60307e
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h
@@ -0,0 +1,48 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/storage/distributor/operations/operation.h>
+#include <vespa/storage/distributor/operation_sequencer.h>
+#include <memory>
+
+namespace storage::distributor {
+
+class PendingMessageTracker;
+class VisitorOperation;
+class OperationOwner;
+
+/**
+ * Operation starting indirection for a visitor operation that has the semantics
+ * of an exclusive bucket lock. Such operations can only resolve to a single
+ * super-bucket/sub-bucket pair and care should be taken to avoid starving client
+ * operations through long-running locks.
+ *
+ * Operation starting may be deferred to the PendingMessageTracker if there are
+ * pending operations to the sub-bucket when onStart is called. If so, the deferred
+ * operation start takes place automatically and immediately when all pending
+ * bucket operations have completed. These will be started in the context of the
+ * OperationOwner provided to the operation.
+ */
+class ReadForWriteVisitorOperationStarter
+ : public Operation,
+ public std::enable_shared_from_this<ReadForWriteVisitorOperationStarter>
+{
+ std::shared_ptr<VisitorOperation> _visitor_op;
+ OperationSequencer& _operation_sequencer;
+ OperationOwner& _stable_operation_owner;
+ PendingMessageTracker& _message_tracker;
+public:
+ ReadForWriteVisitorOperationStarter(std::shared_ptr<VisitorOperation> visitor_op,
+ OperationSequencer& operation_sequencer,
+ OperationOwner& stable_operation_owner,
+ PendingMessageTracker& message_tracker);
+ ~ReadForWriteVisitorOperationStarter() override;
+
+ const char* getName() const override { return "ReadForWriteVisitorOperationStarter"; }
+ void onClose(DistributorMessageSender& sender) override;
+ void onStart(DistributorMessageSender& sender) override;
+ void onReceive(DistributorMessageSender& sender,
+ const std::shared_ptr<api::StorageReply> & msg) override;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
index c35a6671c8d..cf6922b8606 100644
--- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
@@ -43,6 +43,24 @@ VisitorOperation::BucketInfo::toString() const
VisitorOperation::SuperBucketInfo::~SuperBucketInfo() = default;
+namespace {
+
+[[nodiscard]] bool
+matches_visitor_library(vespalib::stringref input, vespalib::stringref expected)
+{
+ if (input.size() != expected.size()) {
+ return false;
+ }
+ for (size_t i = 0; i < input.size(); ++i) {
+ if (static_cast<char>(std::tolower(static_cast<unsigned char>(input[i]))) != expected[i]) {
+ return false;
+ }
+ }
+ return true;
+}
+
+}
+
VisitorOperation::VisitorOperation(
DistributorNodeContext& node_ctx,
DistributorOperationContext& op_ctx,
@@ -55,11 +73,14 @@ VisitorOperation::VisitorOperation(
_op_ctx(op_ctx),
_bucketSpace(bucketSpace),
_msg(m),
- _sentReply(false),
_config(config),
_metrics(metrics),
_trace(TRACE_SOFT_MEMORY_LIMIT),
- _operationTimer(_node_ctx.clock())
+ _operationTimer(_node_ctx.clock()),
+ _bucket_lock(), // Initially no lock is held
+ _sentReply(false),
+ _verified_and_expanded(false),
+ _is_read_for_write(matches_visitor_library(_msg->getLibraryName(), "reindexingvisitor"))
{
const std::vector<document::BucketId>& buckets = m->getBuckets();
@@ -245,7 +266,7 @@ VisitorOperation::verifyVisitorDistributionBitCount(
const document::BucketId& bid)
{
const lib::ClusterState& clusterState = _bucketSpace.getClusterState();
- if (_msg->getDocumentSelection().length() == 0
+ if (_msg->getDocumentSelection().empty()
&& bid.getUsedBits() != clusterState.getDistributionBitCount())
{
LOG(debug,
@@ -332,6 +353,13 @@ VisitorOperation::verifyCreateVisitorCommand(DistributorMessageSender& sender)
verifyOperationContainsBuckets();
verifyOperationHasSuperbucketAndProgress();
verifyOperationSentToCorrectDistributor();
+ // TODO wrap and test
+ if (is_read_for_write() && (_msg->getMaxBucketsPerVisitor() != 1)) {
+ throw VisitorVerificationException(
+ api::ReturnCode::ILLEGAL_PARAMETERS,
+ vespalib::make_string("Read-for-write visitors can only have 1 max pending bucket, was %u",
+ _msg->getMaxBucketsPerVisitor()));
+ }
return true;
} catch (const VisitorVerificationException& e) {
LOG(debug,
@@ -382,14 +410,6 @@ VisitorOperation::pickBucketsToVisit(const std::vector<BucketDatabase::Entry>& b
}
bool
-VisitorOperation::expandBucketAll()
-{
- std::vector<BucketDatabase::Entry> entries;
- _bucketSpace.getBucketDatabase().getAll(_superBucket.bid, entries);
- return pickBucketsToVisit(entries);
-}
-
-bool
VisitorOperation::expandBucketContaining()
{
std::vector<BucketDatabase::Entry> entries;
@@ -568,13 +588,24 @@ VisitorOperation::pickTargetNode(
void
VisitorOperation::onStart(DistributorMessageSender& sender)
{
- if (!verifyCreateVisitorCommand(sender)) {
- return;
+ if (!_verified_and_expanded) {
+ if (!verify_command_and_expand_buckets(sender)) {
+ return;
+ }
}
+ startNewVisitors(sender);
+}
+bool
+VisitorOperation::verify_command_and_expand_buckets(DistributorMessageSender& sender)
+{
+ assert(!_verified_and_expanded);
+ _verified_and_expanded = true;
+ if (!verifyCreateVisitorCommand(sender)) {
+ return false;
+ }
expandBucket();
-
- startNewVisitors(sender);
+ return true;
}
bool
@@ -770,6 +801,7 @@ VisitorOperation::sendStorageVisitor(uint16_t node,
uint32_t pending,
DistributorMessageSender& sender)
{
+ // TODO rewrite to not use copy ctor and remove wonky StorageCommand copy ctor impl
auto cmd = std::make_shared<api::CreateVisitorCommand>(*_msg);
cmd->getBuckets() = buckets;
@@ -848,4 +880,26 @@ VisitorOperation::onClose(DistributorMessageSender& sender)
sendReply(api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"), sender);
}
+void
+VisitorOperation::fail_with_bucket_already_locked(DistributorMessageSender& sender)
+{
+ assert(is_read_for_write());
+ sendReply(api::ReturnCode(api::ReturnCode::BUSY, "This bucket is already locked by another operation"), sender);
+}
+
+std::optional<document::Bucket>
+VisitorOperation::first_bucket_to_visit() const
+{
+ if (_superBucket.subBuckets.empty()) {
+ return {};
+ }
+ return {document::Bucket(_msg->getBucketSpace(), _superBucket.subBuckets.begin()->first)};
+}
+
+void
+VisitorOperation::assign_bucket_lock_handle(SequencingHandle handle)
+{
+ _bucket_lock = std::move(handle);
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
index 28be7dfe353..12164e3cba9 100644
--- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
@@ -2,12 +2,14 @@
#pragma once
#include <vespa/storage/distributor/operations/operation.h>
+#include <vespa/storage/distributor/operation_sequencer.h>
#include <vespa/storage/bucketdb/bucketdatabase.h>
#include <vespa/storage/visiting/memory_bounded_trace.h>
#include <vespa/storageapi/defs.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/storageapi/message/visitor.h>
#include <vespa/storageframework/generic/clock/timer.h>
+#include <optional>
namespace document { class Document; }
@@ -36,7 +38,7 @@ public:
VisitorOperation(DistributorNodeContext& node_ctx,
DistributorOperationContext& op_ctx,
DistributorBucketSpace &bucketSpace,
- const std::shared_ptr<api::CreateVisitorCommand> & msg,
+ const std::shared_ptr<api::CreateVisitorCommand>& msg,
const Config& config,
VisitorMetricSet& metrics);
@@ -47,9 +49,19 @@ public:
void onReceive(DistributorMessageSender& sender,
const std::shared_ptr<api::StorageReply> & msg) override;
+ // Only valid to call if is_read_for_write() == true
+ void fail_with_bucket_already_locked(DistributorMessageSender& sender);
+
+ [[nodiscard]] bool verify_command_and_expand_buckets(DistributorMessageSender& sender);
+
const char* getName() const override { return "visit"; }
std::string getStatus() const override { return ""; }
+ [[nodiscard]] bool has_sent_reply() const noexcept { return _sentReply; }
+ [[nodiscard]] bool is_read_for_write() const noexcept { return _is_read_for_write; }
+ [[nodiscard]] std::optional<document::Bucket> first_bucket_to_visit() const;
+ void assign_bucket_lock_handle(SequencingHandle handle);
+
private:
struct BucketInfo {
bool done;
@@ -63,7 +75,7 @@ private:
vespalib::string toString() const;
};
- typedef std::map<document::BucketId, BucketInfo> VisitBucketMap;
+ using VisitBucketMap = std::map<document::BucketId, BucketInfo>;
struct SuperBucketInfo {
document::BucketId bid;
@@ -71,7 +83,7 @@ private:
VisitBucketMap subBuckets;
std::vector<document::BucketId> subBucketsVisitOrder;
- SuperBucketInfo(const document::BucketId& b = document::BucketId(0))
+ explicit SuperBucketInfo(const document::BucketId& b = document::BucketId(0))
: bid(b),
subBucketsCompletelyExpanded(false)
{
@@ -80,8 +92,8 @@ private:
};
- typedef std::map<uint16_t, std::vector<document::BucketId> > NodeToBucketsMap;
- typedef std::map<uint64_t, api::CreateVisitorCommand::SP> SentMessagesMap;
+ using NodeToBucketsMap = std::map<uint16_t, std::vector<document::BucketId>>;
+ using SentMessagesMap = std::map<uint64_t, api::CreateVisitorCommand::SP>;
void sendReply(const api::ReturnCode& code, DistributorMessageSender& sender);
void updateReplyMetrics(const api::ReturnCode& result);
@@ -94,15 +106,12 @@ private:
void verifyOperationSentToCorrectDistributor();
bool verifyCreateVisitorCommand(DistributorMessageSender& sender);
bool pickBucketsToVisit(const std::vector<BucketDatabase::Entry>& buckets);
- bool expandBucketAll();
bool expandBucketContaining();
bool expandBucketContained();
void expandBucket();
int pickTargetNode(
const BucketDatabase::Entry& entry,
const std::vector<uint16_t>& triedNodes);
- void attemptToParseOrderingSelector();
- bool documentSelectionMayHaveOrdering() const;
bool maySendNewStorageVisitors() const noexcept;
void startNewVisitors(DistributorMessageSender& sender);
void initializeActiveNodes();
@@ -148,7 +157,6 @@ private:
api::CreateVisitorCommand::SP _msg;
api::ReturnCode _storageError;
- bool _sentReply;
SuperBucketInfo _superBucket;
document::BucketId _lastBucket;
@@ -167,11 +175,13 @@ private:
static constexpr size_t TRACE_SOFT_MEMORY_LIMIT = 65536;
- bool done();
- bool hasNoPendingMessages();
document::BucketId getLastBucketVisited();
mbus::TraceNode trace;
framework::MilliSecTimer _operationTimer;
+ SequencingHandle _bucket_lock;
+ bool _sentReply;
+ bool _verified_and_expanded;
+ bool _is_read_for_write;
};
}
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
index 715ac1722b4..8027b5349f9 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
@@ -71,7 +71,7 @@ pairAsRange(Pair pair)
std::vector<uint64_t>
PendingMessageTracker::clearMessagesForNode(uint16_t node)
{
- std::lock_guard<std::mutex> guard(_lock);
+ std::lock_guard guard(_lock);
MessagesByNodeAndBucket& idx(boost::multi_index::get<1>(_messages));
auto range = pairAsRange(idx.equal_range(boost::make_tuple(node)));
@@ -88,7 +88,7 @@ PendingMessageTracker::clearMessagesForNode(uint16_t node)
void
PendingMessageTracker::insert(const std::shared_ptr<api::StorageMessage>& msg)
{
- std::lock_guard<std::mutex> guard(_lock);
+ std::lock_guard guard(_lock);
if (msg->getAddress()) {
_messages.emplace(currentTime(), msg->getType().getId(), msg->getPriority(), msg->getMsgId(),
msg->getBucket(), msg->getAddress()->getIndex());
@@ -103,7 +103,7 @@ PendingMessageTracker::insert(const std::shared_ptr<api::StorageMessage>& msg)
document::Bucket
PendingMessageTracker::reply(const api::StorageReply& r)
{
- std::lock_guard<std::mutex> guard(_lock);
+ std::unique_lock guard(_lock);
document::Bucket bucket;
LOG(debug, "Got reply: %s", r.toString().c_str());
@@ -121,6 +121,16 @@ PendingMessageTracker::reply(const api::StorageReply& r)
}
LOG(debug, "Erased message with id %" PRIu64, msgId);
msgs.erase(msgId);
+ auto deferred_tasks = get_deferred_ops_if_bucket_pending_drained(bucket);
+ // Deferred tasks may try to send messages, which in turn will invoke the PendingMessageTracker.
+ // To avoid deadlocking, we run the tasks outside the lock.
+ // TODO remove locking entirely... Only exists for status pages!
+ guard.unlock();
+ // We expect this to be "effectively noexcept", i.e. any tasks throwing an
+ // exception will end up nuking the distributor process from the unwind.
+ for (auto& task : deferred_tasks) {
+ task->run(TaskRunState::OK);
+ }
}
return bucket;
@@ -129,6 +139,57 @@ PendingMessageTracker::reply(const api::StorageReply& r)
namespace {
template <typename Range>
+bool is_empty_range(const Range& range) noexcept {
+ return (range.first == range.second);
+}
+
+}
+
+std::vector<std::unique_ptr<DeferredTask>>
+PendingMessageTracker::get_deferred_ops_if_bucket_pending_drained(const document::Bucket& bucket)
+{
+ if (_deferred_bucket_tasks.empty()) {
+ return {};
+ }
+ std::vector<std::unique_ptr<DeferredTask>> tasks;
+ auto& bucket_idx = boost::multi_index::get<2>(_messages);
+ auto pending_tasks_for_bucket = bucket_idx.equal_range(bucket);
+ if (is_empty_range(pending_tasks_for_bucket)) {
+ auto waiting_tasks = _deferred_bucket_tasks.equal_range(bucket);
+ for (auto task_iter = waiting_tasks.first; task_iter != waiting_tasks.second; ++task_iter) {
+ tasks.emplace_back(std::move(task_iter->second));
+ }
+ _deferred_bucket_tasks.erase(waiting_tasks.first, waiting_tasks.second);
+ }
+ return tasks;
+}
+
+void
+PendingMessageTracker::run_once_no_pending_for_bucket(const document::Bucket& bucket, std::unique_ptr<DeferredTask> task)
+{
+ std::unique_lock guard(_lock);
+ auto& bucket_idx = boost::multi_index::get<2>(_messages);
+ const auto pending_tasks_for_bucket = bucket_idx.equal_range(bucket);
+ if (is_empty_range(pending_tasks_for_bucket)) {
+ guard.unlock(); // Must not be held whilst running task, or else recursive sends will deadlock.
+ task->run(TaskRunState::OK); // Nothing pending, run immediately.
+ } else {
+ _deferred_bucket_tasks.emplace(bucket, std::move(task));
+ }
+}
+
+void
+PendingMessageTracker::abort_deferred_tasks()
+{
+ std::lock_guard guard(_lock);
+ for (auto& task : _deferred_bucket_tasks) {
+ task.second->run(TaskRunState::Aborted);
+ }
+}
+
+namespace {
+
+template <typename Range>
void
runCheckerOnRange(PendingMessageTracker::Checker& checker, const Range& range)
{
@@ -144,7 +205,7 @@ runCheckerOnRange(PendingMessageTracker::Checker& checker, const Range& range)
void
PendingMessageTracker::checkPendingMessages(uint16_t node, const document::Bucket &bucket, Checker& checker) const
{
- std::lock_guard<std::mutex> guard(_lock);
+ std::lock_guard guard(_lock);
const MessagesByNodeAndBucket& msgs(boost::multi_index::get<1>(_messages));
auto range = pairAsRange(msgs.equal_range(boost::make_tuple(node, bucket)));
@@ -154,7 +215,7 @@ PendingMessageTracker::checkPendingMessages(uint16_t node, const document::Bucke
void
PendingMessageTracker::checkPendingMessages(const document::Bucket &bucket, Checker& checker) const
{
- std::lock_guard<std::mutex> guard(_lock);
+ std::lock_guard guard(_lock);
const MessagesByBucketAndType& msgs(boost::multi_index::get<2>(_messages));
auto range = pairAsRange(msgs.equal_range(boost::make_tuple(bucket)));
@@ -164,7 +225,7 @@ PendingMessageTracker::checkPendingMessages(const document::Bucket &bucket, Chec
bool
PendingMessageTracker::hasPendingMessage(uint16_t node, const document::Bucket &bucket, uint32_t messageType) const
{
- std::lock_guard<std::mutex> guard(_lock);
+ std::lock_guard guard(_lock);
const MessagesByNodeAndBucket& msgs(boost::multi_index::get<1>(_messages));
auto range = msgs.equal_range(boost::make_tuple(node, bucket, messageType));
@@ -181,7 +242,7 @@ PendingMessageTracker::getStatusStartPage(std::ostream& out) const
void
PendingMessageTracker::getStatusPerBucket(std::ostream& out) const
{
- std::lock_guard<std::mutex> guard(_lock);
+ std::lock_guard guard(_lock);
const MessagesByNodeAndBucket& msgs = boost::multi_index::get<1>(_messages);
using BucketMap = std::map<document::Bucket, std::vector<vespalib::string>>;
BucketMap perBucketMsgs;
@@ -210,7 +271,7 @@ PendingMessageTracker::getStatusPerBucket(std::ostream& out) const
void
PendingMessageTracker::getStatusPerNode(std::ostream& out) const
{
- std::lock_guard<std::mutex> guard(_lock);
+ std::lock_guard guard(_lock);
const MessagesByNodeAndBucket& msgs = boost::multi_index::get<1>(_messages);
int lastNode = -1;
for (const auto & node : msgs) {
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
index 2dba244dc9f..39ea5c9c1a6 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
@@ -15,7 +15,6 @@
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index/composite_key.hpp>
-
#include <set>
#include <unordered_map>
#include <chrono>
@@ -23,14 +22,50 @@
namespace storage::distributor {
-class PendingMessageTracker : public framework::HtmlStatusReporter
-{
+/**
+ * Since the state a deferred task depends on may have changed between the
+ * time a task was scheduled and when it's actually executed, this enum provides
+ * a means of communicating if a task should be started as normal.
+ */
+enum class TaskRunState {
+ OK, // Task may be started as normal
+ Aborted, // Task should trigger an immediate abort behavior (distributor is shutting down)
+ BucketLost // Task should trigger an immediate abort behavior (bucket no longer present on node)
+};
+
+/**
+ * Represents an arbitrary task whose execution may be deferred until no
+ * further pending operations are present.
+ */
+struct DeferredTask {
+ virtual ~DeferredTask() = default;
+ virtual void run(TaskRunState state) = 0;
+};
+template <typename Func>
+class LambdaDeferredTask : public DeferredTask {
+ Func _func;
+public:
+ explicit LambdaDeferredTask(Func&& f) : _func(std::move(f)) {}
+ LambdaDeferredTask(const LambdaDeferredTask&) = delete;
+ LambdaDeferredTask(LambdaDeferredTask&&) = delete;
+ ~LambdaDeferredTask() override = default;
+
+ void run(TaskRunState state) override {
+ _func(state);
+ }
+};
+
+template <typename Func>
+std::unique_ptr<DeferredTask> make_deferred_task(Func&& f) {
+ return std::make_unique<LambdaDeferredTask<std::decay_t<Func>>>(std::forward<Func>(f));
+}
+
+class PendingMessageTracker : public framework::HtmlStatusReporter {
public:
class Checker {
public:
- virtual ~Checker() {}
-
+ virtual ~Checker() = default;
virtual bool check(uint32_t messageType, uint16_t node, uint8_t priority) = 0;
};
@@ -42,8 +77,8 @@ public:
*/
using TimePoint = std::chrono::milliseconds;
- PendingMessageTracker(framework::ComponentRegister&);
- ~PendingMessageTracker();
+ explicit PendingMessageTracker(framework::ComponentRegister&);
+ ~PendingMessageTracker() override;
void insert(const std::shared_ptr<api::StorageMessage>&);
document::Bucket reply(const api::StorageReply& reply);
@@ -89,6 +124,8 @@ public:
_nodeBusyDuration = secs;
}
+ void run_once_no_pending_for_bucket(const document::Bucket& bucket, std::unique_ptr<DeferredTask> task);
+ void abort_deferred_tasks();
private:
struct MessageEntry {
TimePoint timeStamp;
@@ -130,23 +167,29 @@ private:
{
};
- typedef boost::multi_index::multi_index_container <
+ using Messages = boost::multi_index::multi_index_container <
MessageEntry,
boost::multi_index::indexed_by<
boost::multi_index::ordered_unique<MessageIdKey>,
boost::multi_index::ordered_non_unique<CompositeNodeBucketKey>,
boost::multi_index::ordered_non_unique<CompositeBucketMsgNodeKey>
>
- > Messages;
-
- typedef Messages::nth_index<0>::type MessagesByMsgId;
- typedef Messages::nth_index<1>::type MessagesByNodeAndBucket;
- typedef Messages::nth_index<2>::type MessagesByBucketAndType;
-
- Messages _messages;
- framework::Component _component;
- NodeInfo _nodeInfo;
- std::chrono::seconds _nodeBusyDuration;
+ >;
+
+ using MessagesByMsgId = Messages::nth_index<0>::type;
+ using MessagesByNodeAndBucket = Messages::nth_index<1>::type;
+ using MessagesByBucketAndType = Messages::nth_index<2>::type;
+ using DeferredBucketTaskMap = std::unordered_multimap<
+ document::Bucket,
+ std::unique_ptr<DeferredTask>,
+ document::Bucket::hash
+ >;
+
+ Messages _messages;
+ framework::Component _component;
+ NodeInfo _nodeInfo;
+ std::chrono::seconds _nodeBusyDuration;
+ DeferredBucketTaskMap _deferred_bucket_tasks;
// Since distributor is currently single-threaded, this will only
// contend when status page is being accessed. It is, however, required
@@ -169,6 +212,8 @@ private:
void getStatusPerNode(std::ostream& out) const;
void getStatusPerBucket(std::ostream& out) const;
TimePoint currentTime() const;
+
+ std::vector<std::unique_ptr<DeferredTask>> get_deferred_ops_if_bucket_pending_drained(const document::Bucket&);
};
}
diff --git a/storage/src/vespa/storage/visiting/CMakeLists.txt b/storage/src/vespa/storage/visiting/CMakeLists.txt
index f8999cd7dcb..27c8b8853b8 100644
--- a/storage/src/vespa/storage/visiting/CMakeLists.txt
+++ b/storage/src/vespa/storage/visiting/CMakeLists.txt
@@ -6,6 +6,7 @@ vespa_add_library(storage_visitor OBJECT
dumpvisitorsingle.cpp
memory_bounded_trace.cpp
recoveryvisitor.cpp
+ reindexing_visitor.cpp
testvisitor.cpp
visitor.cpp
visitormanager.cpp
diff --git a/storage/src/vespa/storage/visiting/dumpvisitorsingle.h b/storage/src/vespa/storage/visiting/dumpvisitorsingle.h
index 4cc538d6fd7..ccfc8a5a3cf 100644
--- a/storage/src/vespa/storage/visiting/dumpvisitorsingle.h
+++ b/storage/src/vespa/storage/visiting/dumpvisitorsingle.h
@@ -26,11 +26,10 @@ struct DumpVisitorSingleFactory : public VisitorFactory {
VisitorEnvironment::UP
makeVisitorEnvironment(StorageComponent&) override {
- return VisitorEnvironment::UP(new VisitorEnvironment);
+ return std::make_unique<VisitorEnvironment>();
};
Visitor*
-
makeVisitor(StorageComponent& c, VisitorEnvironment&, const vdslib::Parameters& params) override {
return new DumpVisitorSingle(c, params);
}
diff --git a/storage/src/vespa/storage/visiting/reindexing_visitor.cpp b/storage/src/vespa/storage/visiting/reindexing_visitor.cpp
new file mode 100644
index 00000000000..8d2b4736582
--- /dev/null
+++ b/storage/src/vespa/storage/visiting/reindexing_visitor.cpp
@@ -0,0 +1,36 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "reindexing_visitor.h"
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h>
+#include <vespa/storage/common/reindexing_constants.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".visitor.instance.reindexing_visitor");
+
+namespace storage {
+
+ReindexingVisitor::ReindexingVisitor(StorageComponent& component)
+ : Visitor(component)
+{
+}
+
+void ReindexingVisitor::handleDocuments(const document::BucketId& /*bucketId*/,
+ std::vector<spi::DocEntry::UP>& entries,
+ HitCounter& hitCounter)
+{
+ LOG(debug, "Visitor %s handling block of %zu documents.", _id.c_str(), entries.size());
+ for (auto& entry : entries) {
+ if (entry->isRemove()) {
+ // We don't reindex removed documents, as that would be very silly.
+ continue;
+ }
+ const uint32_t doc_size = entry->getDocumentSize();
+ hitCounter.addHit(*entry->getDocumentId(), doc_size);
+ auto msg = std::make_unique<documentapi::PutDocumentMessage>(entry->releaseDocument());
+ msg->setApproxSize(doc_size);
+ msg->setCondition(documentapi::TestAndSetCondition(reindexing_bucket_lock_bypass_value()));
+ sendMessage(std::move(msg));
+ }
+}
+
+}
diff --git a/storage/src/vespa/storage/visiting/reindexing_visitor.h b/storage/src/vespa/storage/visiting/reindexing_visitor.h
new file mode 100644
index 00000000000..9be21f24f5f
--- /dev/null
+++ b/storage/src/vespa/storage/visiting/reindexing_visitor.h
@@ -0,0 +1,36 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "visitor.h"
+
+namespace storage {
+
+/**
+ * A visitor instance that is intended to be used for background reindexing.
+ * Only meant to be run alongside distributor-level bucket locking support
+ * that prevents concurrent writes to documents in the visited bucket.
+ *
+ * The bucket lock is explicitly bypassed by the Puts sent by the visitor
+ * by having all these be augmented with a special TaS string that is
+ * recognized by the distributor.
+ */
+class ReindexingVisitor : public Visitor {
+public:
+ explicit ReindexingVisitor(StorageComponent& component);
+ ~ReindexingVisitor() override = default;
+
+private:
+ void handleDocuments(const document::BucketId&, std::vector<spi::DocEntry::UP>&, HitCounter&) override;
+};
+
+struct ReindexingVisitorFactory : public VisitorFactory {
+ VisitorEnvironment::UP makeVisitorEnvironment(StorageComponent&) override {
+ return std::make_unique<VisitorEnvironment>();
+ };
+
+ Visitor* makeVisitor(StorageComponent& c, VisitorEnvironment&, const vdslib::Parameters&) override {
+ return new ReindexingVisitor(c);
+ }
+};
+
+}
diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp
index 7c138afeb0f..bc77fd268d5 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.cpp
+++ b/storage/src/vespa/storage/visiting/visitormanager.cpp
@@ -6,6 +6,7 @@
#include "countvisitor.h"
#include "testvisitor.h"
#include "recoveryvisitor.h"
+#include "reindexing_visitor.h"
#include <vespa/storage/common/statusmessages.h>
#include <vespa/config/common/exceptions.h>
#include <vespa/vespalib/util/stringfmt.h>
@@ -55,6 +56,7 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri,
_visitorFactories["testvisitor"] = std::make_shared<TestVisitorFactory>();
_visitorFactories["countvisitor"] = std::make_shared<CountVisitorFactory>();
_visitorFactories["recoveryvisitor"] = std::make_shared<RecoveryVisitorFactory>();
+ _visitorFactories["reindexingvisitor"] = std::make_shared<ReindexingVisitorFactory>();
_component.registerStatusPage(*this);
}