diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-12-01 09:34:29 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-12-01 15:12:54 +0000 |
commit | fb96b6e254ef367428374eb4f254c248666bcd11 (patch) | |
tree | 72f23afd99187cfb97c8d2089a5c3504f2b175ce | |
parent | 008477111ac92e2eec81596a5037a205bbea53ff (diff) |
Initial support for backend reindexing visitor functionality
Introduces the concept of a read-for-write visitor operation which
blocks all mutating operations from starting for a bucket being
visited. This read-for-write mode is used if (and only if) the visitor
library being specified by the client is "reindexingvisitor".
Since read-for-write visitors cannot race with concurrent write
operations, starting such visitors are deferred until no further
mutations are pending.
Also adds a basic reindexingvisitor implementation to the content node
which sends all documents as Puts containing a special TaS token
that will let the operation through even if a bucket is locked. This
token is cleared by the distributor before it is passed on to the
content nodes.
Note: this feature is not yet production ready. For now the following
caveats apply:
* Mutating vs non-mutating pending ops to a bucket are not tracked
separately, so it’s possible to starve a reindexing visitor by
sending constant pending read load, as read load is not blocked by
the operation sequencer.
* Ideal state operations towards locked buckets are not blocked, so
it's possible for e.g. a split to be sent for a bucket that is being
visited.
26 files changed, 914 insertions, 146 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt index 96df1a842a4..810ffb550bf 100644 --- a/storage/src/tests/distributor/CMakeLists.txt +++ b/storage/src/tests/distributor/CMakeLists.txt @@ -32,6 +32,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST pendingmessagetrackertest.cpp persistence_metrics_set_test.cpp putoperationtest.cpp + read_for_write_visitor_operation_test.cpp removebucketoperationtest.cpp removelocationtest.cpp removeoperationtest.cpp diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index 9bd64876e98..b9b633d4525 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -5,7 +5,10 @@ #include <vespa/storage/distributor/distributor.h> #include <vespa/storage/distributor/distributormetricsset.h> #include <vespa/storage/distributor/operations/external/getoperation.h> +#include <vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h> +#include <vespa/storage/common/reindexing_constants.h> #include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/visitor.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/update/documentupdate.h> #include <vespa/document/fieldset/fieldsets.h> @@ -560,6 +563,80 @@ TEST_F(ExternalOperationHandlerTest, gets_are_sent_with_weak_consistency_if_conf do_test_get_weak_consistency_is_propagated(true); } +struct OperationHandlerSequencingTest : ExternalOperationHandlerTest { + void SetUp() override { + set_up_distributor_for_sequencing_test(); + } + + static documentapi::TestAndSetCondition bucket_lock_bypass_tas_condition() { + return documentapi::TestAndSetCondition(reindexing_bucket_lock_bypass_value()); + } +}; + +TEST_F(OperationHandlerSequencingTest, put_not_allowed_through_locked_bucket_if_special_tas_token_not_present) { + auto put = makePutCommand("testdoctype1", "id:foo:testdoctype1:n=1:bar"); + auto bucket = makeDocumentBucket(document::BucketId(16, 1)); + auto bucket_handle = getExternalOperationHandler().operation_sequencer().try_acquire(bucket); + ASSERT_TRUE(bucket_handle.valid()); + ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(put)); +} + +TEST_F(OperationHandlerSequencingTest, put_allowed_through_locked_bucket_if_special_tas_token_present) { + set_up_distributor_for_sequencing_test(); + + auto put = makePutCommand("testdoctype1", "id:foo:testdoctype1:n=1:bar"); + put->setCondition(bucket_lock_bypass_tas_condition()); + + auto bucket = makeDocumentBucket(document::BucketId(16, 1)); + auto bucket_handle = getExternalOperationHandler().operation_sequencer().try_acquire(bucket); + ASSERT_TRUE(bucket_handle.valid()); + + Operation::SP op; + ASSERT_NO_FATAL_FAILURE(start_operation_verify_not_rejected(put, op)); +} + +TEST_F(OperationHandlerSequencingTest, put_with_bucket_lock_tas_token_is_rejected_if_no_bucket_lock_present) { + auto put = makePutCommand("testdoctype1", "id:foo:testdoctype1:n=1:bar"); + put->setCondition(bucket_lock_bypass_tas_condition()); + ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(put)); + // TODO determine most appropriate error code here. Want to fail the bucket but + // not the entire visitor operation. Will likely need to be revisited (heh!) soon. + EXPECT_EQ("ReturnCode(REJECTED, Operation expects a read-for-write bucket lock to be present, " + "but none currently exists)", + _sender.reply(0)->getResult().toString()); +} + +// This test is a variation of the above, but whereas it tests the case where _no_ lock is +// present, this tests the case where a lock is present but it's not a bucket-level lock. +TEST_F(OperationHandlerSequencingTest, put_with_bucket_lock_tas_token_is_rejected_if_document_lock_present) { + auto put = makePutCommand("testdoctype1", _dummy_id); + put->setCondition(bucket_lock_bypass_tas_condition()); + Operation::SP op; + ASSERT_NO_FATAL_FAILURE(start_operation_verify_not_rejected(makeUpdateCommand("testdoctype1", _dummy_id), op)); + ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(std::move(put))); + EXPECT_EQ("ReturnCode(REJECTED, Operation expects a read-for-write bucket lock to be present, " + "but none currently exists)", + _sender.reply(0)->getResult().toString()); +} + +TEST_F(OperationHandlerSequencingTest, reindexing_visitor_creates_read_for_write_operation) { + auto cmd = std::make_shared<api::CreateVisitorCommand>( + document::FixedBucketSpaces::default_space(), "reindexingvisitor", "foo", ""); + Operation::SP op; + getExternalOperationHandler().handleMessage(cmd, op); + ASSERT_TRUE(op.get() != nullptr); + ASSERT_TRUE(dynamic_cast<ReadForWriteVisitorOperationStarter*>(op.get()) != nullptr); +} + +TEST_F(OperationHandlerSequencingTest, reindexing_visitor_library_check_is_case_insensitive) { + auto cmd = std::make_shared<api::CreateVisitorCommand>( + document::FixedBucketSpaces::default_space(), "ReIndexingVisitor", "foo", ""); + Operation::SP op; + getExternalOperationHandler().handleMessage(cmd, op); + ASSERT_TRUE(op.get() != nullptr); + ASSERT_TRUE(dynamic_cast<ReadForWriteVisitorOperationStarter*>(op.get()) != nullptr); +} + // TODO support sequencing of RemoveLocation? It's a mutating operation, but supporting it with // the current approach is not trivial. A RemoveLocation operation covers the _entire_ bucket // sub tree under a given location, while the sequencer works on individual GIDs. Mapping the diff --git a/storage/src/tests/distributor/operation_sequencer_test.cpp b/storage/src/tests/distributor/operation_sequencer_test.cpp index b3674ee2126..9d8553af0ae 100644 --- a/storage/src/tests/distributor/operation_sequencer_test.cpp +++ b/storage/src/tests/distributor/operation_sequencer_test.cpp @@ -1,46 +1,94 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/document/base/documentid.h> +#include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/storage/distributor/operation_sequencer.h> #include <vespa/vespalib/gtest/gtest.h> namespace storage::distributor { using document::DocumentId; +using namespace ::testing; -TEST(OperationSequencerTest, can_get_sequencing_handle_for_id_without_existing_handle) { +namespace { + +constexpr document::BucketSpace default_space() { + return document::FixedBucketSpaces::default_space(); +} + +constexpr document::BucketSpace global_space() { + return document::FixedBucketSpaces::global_space(); +} + +} + +struct OperationSequencerTest : Test { OperationSequencer sequencer; - auto handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd")); +}; + +TEST_F(OperationSequencerTest, can_get_sequencing_handle_for_id_without_existing_handle) { + auto handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test::abcd")); EXPECT_TRUE(handle.valid()); + EXPECT_FALSE(handle.was_blocked()); } -TEST(OperationSequencerTest, cannot_get_sequencing_handle_for_id_with_existing_handle) { - OperationSequencer sequencer; - auto first_handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd")); - auto second_handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd")); +TEST_F(OperationSequencerTest, cannot_get_sequencing_handle_for_id_with_existing_handle) { + auto first_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test::abcd")); + auto second_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test::abcd")); EXPECT_FALSE(second_handle.valid()); + ASSERT_TRUE(second_handle.was_blocked()); + EXPECT_EQ(second_handle.blocked_by(), SequencingHandle::BlockedBy::PendingOperation); } -TEST(OperationSequencerTest, can_get_sequencing_handle_for_different_ids) { - OperationSequencer sequencer; - auto first_handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd")); - auto second_handle = sequencer.try_acquire(DocumentId("id:foo:test::efgh")); +TEST_F(OperationSequencerTest, can_get_sequencing_handle_for_different_ids) { + auto first_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test::abcd")); + auto second_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test::efgh")); EXPECT_TRUE(first_handle.valid()); EXPECT_TRUE(second_handle.valid()); } -TEST(OperationSequencerTest, releasing_handle_allows_for_getting_new_handles_for_id) { - OperationSequencer sequencer; - auto first_handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd")); +TEST_F(OperationSequencerTest, releasing_handle_allows_for_getting_new_handles_for_id) { + auto first_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test::abcd")); // Explicit release first_handle.release(); { - auto second_handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd")); + auto second_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test::abcd")); EXPECT_TRUE(second_handle.valid()); // Implicit release by scope exit } - auto third_handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd")); + auto third_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test::abcd")); EXPECT_TRUE(third_handle.valid()); } +TEST_F(OperationSequencerTest, cannot_get_handle_for_gid_contained_in_locked_bucket) { + auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1))); + EXPECT_TRUE(bucket_handle.valid()); + auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=1:abcd")); + EXPECT_FALSE(doc_handle.valid()); + ASSERT_TRUE(doc_handle.was_blocked()); + EXPECT_EQ(doc_handle.blocked_by(), SequencingHandle::BlockedBy::LockedBucket); +} + +TEST_F(OperationSequencerTest, can_get_handle_for_gid_not_contained_in_active_bucket) { + auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1))); + EXPECT_TRUE(bucket_handle.valid()); + // Note: different sub-bucket than the lock + auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=2:abcd")); + EXPECT_TRUE(doc_handle.valid()); +} + +TEST_F(OperationSequencerTest, releasing_bucket_lock_allows_gid_handles_to_be_acquired) { + auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1))); + bucket_handle.release(); + auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=1:abcd")); + EXPECT_TRUE(doc_handle.valid()); +} + +TEST_F(OperationSequencerTest, can_get_handle_for_gid_when_locked_bucket_is_in_separate_bucket_space) { + auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1))); + EXPECT_TRUE(bucket_handle.valid()); + auto doc_handle = sequencer.try_acquire(global_space(), DocumentId("id:foo:test:n=1:abcd")); + EXPECT_TRUE(doc_handle.valid()); +} + } // storage::distributor diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp index 90171db97d1..71b51c9a7b6 100644 --- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp +++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp @@ -1,13 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/document/base/testdocman.h> +#include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/distributor/pendingmessagetracker.h> #include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> -#include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> #include <tests/common/dummystoragelink.h> -#include <vespa/document/test/make_document_bucket.h> +#include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/gtest/gtest.h> #include <gmock/gmock.h> @@ -83,6 +83,16 @@ public: _tracker->reply(*putReply); } + std::shared_ptr<api::PutCommand> createPutToNode(uint16_t node) const { + document::BucketId bucket(16, 1234); + auto cmd = std::make_shared<api::PutCommand>( + makeDocumentBucket(bucket), + createDummyDocumentForBucket(bucket), + api::Timestamp(123456)); + cmd->setAddress(makeStorageAddress(node)); + return cmd; + } + PendingMessageTracker& tracker() { return *_tracker; } auto& clock() { return _clock; } @@ -98,16 +108,6 @@ private: return _testDocMan.createDocument("foobar", createDummyIdString(bucket)); } - std::shared_ptr<api::PutCommand> createPutToNode(uint16_t node) const { - document::BucketId bucket(16, 1234); - auto cmd = std::make_shared<api::PutCommand>( - makeDocumentBucket(bucket), - createDummyDocumentForBucket(bucket), - api::Timestamp(123456)); - cmd->setAddress(makeStorageAddress(node)); - return cmd; - } - std::shared_ptr<api::RemoveCommand> createRemoveToNode( uint16_t node) const { @@ -436,4 +436,49 @@ TEST_F(PendingMessageTrackerTest, busy_node_duration_can_be_adjusted) { EXPECT_FALSE(f.tracker().getNodeInfo().isBusy(0)); } +namespace { + +document::BucketId bucket_of(const document::DocumentId& id) { + return document::BucketId(16, id.getGlobalId().convertToBucketId().getId()); +} + +} + +TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_no_pending_ops) { + Fixture f; + auto cmd = f.createPutToNode(0); + auto bucket_id = bucket_of(cmd->getDocumentId()); + auto state = TaskRunState::Aborted; + f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){ + state = s; + })); + EXPECT_EQ(state, TaskRunState::OK); +} + +TEST_F(PendingMessageTrackerTest, deferred_task_not_started_before_pending_ops_completed) { + Fixture f; + auto cmd = f.sendPut(RequestBuilder().toNode(0)); + auto bucket_id = bucket_of(cmd->getDocumentId()); + auto state = TaskRunState::Aborted; + f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){ + state = s; + })); + EXPECT_EQ(state, TaskRunState::Aborted); + f.sendPutReply(*cmd, RequestBuilder()); // Deferred task should be run as part of this. + EXPECT_EQ(state, TaskRunState::OK); +} + +TEST_F(PendingMessageTrackerTest, abort_invokes_deferred_tasks_with_aborted_status) { + Fixture f; + auto cmd = f.sendPut(RequestBuilder().toNode(0)); + auto bucket_id = bucket_of(cmd->getDocumentId()); + auto state = TaskRunState::OK; + f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){ + state = s; + })); + EXPECT_EQ(state, TaskRunState::OK); + f.tracker().abort_deferred_tasks(); + EXPECT_EQ(state, TaskRunState::Aborted); +} + } diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp new file mode 100644 index 00000000000..76112b1c729 --- /dev/null +++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp @@ -0,0 +1,124 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/document/base/testdocman.h> +#include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/document/repo/documenttyperepo.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h> +#include <vespa/storage/distributor/operations/external/visitoroperation.h> +#include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributormetricsset.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/visitor.h> +#include <tests/distributor/distributortestutil.h> +#include <vespa/vespalib/gtest/gtest.h> + +using namespace ::testing; +using document::Bucket; +using document::BucketId; + +namespace storage::distributor { + +struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { + document::TestDocMan _test_doc_man; + VisitorOperation::Config _default_config; + std::unique_ptr<OperationOwner> _op_owner; + BucketId _superbucket; + BucketId _sub_bucket; + + ReadForWriteVisitorOperationStarterTest() + : _test_doc_man(), + _default_config(100, 100), + _op_owner(), + _superbucket(16, 4), + _sub_bucket(17, 4) + {} + + void SetUp() override { + createLinks(); + setupDistributor(1, 1, "version:1 distributor:1 storage:1"); + _op_owner = std::make_unique<OperationOwner>(_sender, getClock()); + + addNodesToBucketDB(_sub_bucket, "0=1/2/3/t"); + } + + void TearDown() override { + close(); + } + + static Bucket default_bucket(BucketId id) { + return Bucket(document::FixedBucketSpaces::default_space(), id); + } + + std::shared_ptr<VisitorOperation> create_nested_visitor_op(bool valid_command = true) { + auto cmd = std::make_shared<api::CreateVisitorCommand>( + document::FixedBucketSpaces::default_space(), "reindexingvisitor", "foo", ""); + if (valid_command) { + cmd->addBucketToBeVisited(_superbucket); + cmd->addBucketToBeVisited(BucketId()); // Will be inferred to first sub-bucket in DB + } + return std::make_shared<VisitorOperation>( + getExternalOperationHandler(), getExternalOperationHandler(), + getDistributorBucketSpace(), cmd, _default_config, + getDistributor().getMetrics().visits); + } + + OperationSequencer& operation_sequencer() { + return getExternalOperationHandler().operation_sequencer(); + } + + std::shared_ptr<ReadForWriteVisitorOperationStarter> create_rfw_op(std::shared_ptr<VisitorOperation> visitor_op) { + return std::make_shared<ReadForWriteVisitorOperationStarter>( + std::move(visitor_op), operation_sequencer(), + *_op_owner, getDistributor().getPendingMessageTracker()); + } +}; + +TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_that_fails_precondition_checks_is_immediately_failed) { + auto op = create_rfw_op(create_nested_visitor_op(false)); + _op_owner->start(op, OperationStarter::Priority(120)); + EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) " + "ReturnCode(ILLEGAL_PARAMETERS, No buckets in CreateVisitorCommand for visitor 'foo')", + _sender.getLastReply()); +} + +TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_immediately_started_if_no_pending_ops_to_bucket) { + auto op = create_rfw_op(create_nested_visitor_op(true)); + _op_owner->start(op, OperationStarter::Priority(120)); + ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); +} + +TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_start_deferred_if_pending_ops_to_bucket) { + auto op = create_rfw_op(create_nested_visitor_op(true)); + // Pending mutating op to same bucket, prevents visitor from starting + auto update = std::make_shared<document::DocumentUpdate>( + _test_doc_man.getTypeRepo(), + *_test_doc_man.getTypeRepo().getDocumentType("testdoctype1"), + document::DocumentId("id::testdoctype1:n=4:foo")); + auto update_cmd = std::make_shared<api::UpdateCommand>( + default_bucket(document::BucketId(0)), std::move(update), api::Timestamp(0)); + + Operation::SP mutating_op; + getExternalOperationHandler().handleMessage(update_cmd, mutating_op); + ASSERT_TRUE(mutating_op); + _op_owner->start(mutating_op, OperationStarter::Priority(120)); + ASSERT_EQ("Update(BucketId(0x4400000000000004), id::testdoctype1:n=4:foo, timestamp 1) => 0", + _sender.getCommands(true, true)); + // Since pending message tracking normally happens in the distributor itself during sendUp, + // we have to emulate this and explicitly insert the sent message into the pending mapping. + getDistributor().getPendingMessageTracker().insert(_sender.command(0)); + + _op_owner->start(op, OperationStarter::Priority(120)); + // Nothing started yet + ASSERT_EQ("", _sender.getCommands(true, false, 1)); + + // Pretend update operation completed + auto update_reply = std::shared_ptr<api::StorageReply>(_sender.command(0)->makeReply()); + getDistributor().getPendingMessageTracker().reply(*update_reply); + _op_owner->handleReply(update_reply); + + // Visitor should now be started! + ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true, false, 1)); +} + +} diff --git a/storage/src/vespa/storage/common/CMakeLists.txt b/storage/src/vespa/storage/common/CMakeLists.txt index 81c6486eaeb..29130515d2a 100644 --- a/storage/src/vespa/storage/common/CMakeLists.txt +++ b/storage/src/vespa/storage/common/CMakeLists.txt @@ -9,6 +9,7 @@ vespa_add_library(storage_common OBJECT global_bucket_space_distribution_converter.cpp messagebucket.cpp messagesender.cpp + reindexing_constants.cpp servicelayercomponent.cpp statusmessages.cpp statusmetricconsumer.cpp diff --git a/storage/src/vespa/storage/common/reindexing_constants.cpp b/storage/src/vespa/storage/common/reindexing_constants.cpp new file mode 100644 index 00000000000..c8330a43ffb --- /dev/null +++ b/storage/src/vespa/storage/common/reindexing_constants.cpp @@ -0,0 +1,12 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "reindexing_constants.h" + +namespace storage { + +const char* reindexing_bucket_lock_bypass_value() noexcept { + // This is by design a string that will fail to parse as a valid document selection in the backend. + // It's only used to bypass the read-for-write visitor bucket lock. + return "@@__vespa_internal_allow_through_bucket_lock"; +} + +} diff --git a/storage/src/vespa/storage/common/reindexing_constants.h b/storage/src/vespa/storage/common/reindexing_constants.h new file mode 100644 index 00000000000..be7d47b503c --- /dev/null +++ b/storage/src/vespa/storage/common/reindexing_constants.h @@ -0,0 +1,8 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +namespace storage { + +const char* reindexing_bucket_lock_bypass_value() noexcept; + +} diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 4f3bc7a6caf..158cb782f87 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -83,7 +83,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _distributorStatusDelegate(compReg, *this, *this), _bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater), _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, manageActiveBucketCopies), - _externalOperationHandler(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, _idealStateManager, compReg), + _externalOperationHandler(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, + _idealStateManager, _operationOwner, compReg), _threadPool(threadPool), _initializingIsUp(true), _doneInitializeHandler(doneInitHandler), @@ -217,6 +218,7 @@ void Distributor::onClose() { } LOG(debug, "Distributor::onClose invoked"); + _pendingMessageTracker.abort_deferred_tasks(); _bucketDBUpdater.flush(); _externalOperationHandler.close_pending(); _operationOwner.onClose(); diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index e3a6056a723..cb8cabe07af 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -11,8 +11,10 @@ #include <vespa/storage/distributor/operations/external/getoperation.h> #include <vespa/storage/distributor/operations/external/statbucketoperation.h> #include <vespa/storage/distributor/operations/external/statbucketlistoperation.h> +#include <vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h> #include <vespa/storage/distributor/operations/external/removelocationoperation.h> #include <vespa/storage/distributor/operations/external/visitoroperation.h> +#include <vespa/storage/common/reindexing_constants.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/removelocation.h> #include <vespa/storageapi/message/stat.h> @@ -55,11 +57,13 @@ ExternalOperationHandler::ExternalOperationHandler(Distributor& owner, DistributorBucketSpaceRepo& bucketSpaceRepo, DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, const MaintenanceOperationGenerator& gen, + OperationOwner& operation_owner, DistributorComponentRegister& compReg) : DistributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "External operation handler"), _direct_dispatch_sender(std::make_unique<DirectDispatchSender>(owner)), _operationGenerator(gen), _rejectFeedBeforeTimeReached(), // At epoch + _distributor_operation_owner(operation_owner), _non_main_thread_ops_mutex(), _non_main_thread_ops_owner(*_direct_dispatch_sender, getClock()), _concurrent_gets_enabled(false), @@ -239,8 +243,7 @@ void ExternalOperationHandler::bounce_or_invoke_read_only_op( } } -IMPL_MSG_COMMAND_H(ExternalOperationHandler, Put) -{ +bool ExternalOperationHandler::onPut(const std::shared_ptr<api::PutCommand>& cmd) { auto& metrics = getMetrics().puts; if (!checkTimestampMutationPreconditions(*cmd, getBucketId(cmd->getDocumentId()), metrics)) { return true; @@ -250,11 +253,26 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Put) cmd->setTimestamp(getUniqueTimestamp()); } - auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId()); - if (allowMutation(handle)) { - document::BucketSpace bucketSpace = cmd->getBucket().getBucketSpace(); + const auto bucket_space = cmd->getBucket().getBucketSpace(); + auto handle = _operation_sequencer.try_acquire(bucket_space, cmd->getDocumentId()); + bool allow = allowMutation(handle); + const auto& tas_cond = cmd->getCondition(); + const bool bypass_bucket_lock = (tas_cond.isPresent() + && (tas_cond.getSelection() == reindexing_bucket_lock_bypass_value())); + if (bypass_bucket_lock) { + if (!allow && handle.was_blocked() && (handle.blocked_by() == SequencingHandle::BlockedBy::LockedBucket)) { + cmd->setCondition(documentapi::TestAndSetCondition()); // Must clear TaS or the backend will reject the op + allow = true; + } else { + bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::REJECTED, + "Operation expects a read-for-write bucket lock to be present, " + "but none currently exists")); + return true; + } + } + if (allow) { _op = std::make_shared<PutOperation>(*this, *this, - _bucketSpaceRepo.get(bucketSpace), + _bucketSpaceRepo.get(bucket_space), std::move(cmd), getMetrics().puts, std::move(handle)); } else { sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics)); @@ -264,8 +282,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Put) } -IMPL_MSG_COMMAND_H(ExternalOperationHandler, Update) -{ +bool ExternalOperationHandler::onUpdate(const std::shared_ptr<api::UpdateCommand>& cmd) { auto& metrics = getMetrics().updates; if (!checkTimestampMutationPreconditions(*cmd, getBucketId(cmd->getDocumentId()), metrics)) { return true; @@ -274,11 +291,11 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Update) if (cmd->getTimestamp() == 0) { cmd->setTimestamp(getUniqueTimestamp()); } - auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId()); + const auto bucket_space = cmd->getBucket().getBucketSpace(); + auto handle = _operation_sequencer.try_acquire(bucket_space, cmd->getDocumentId()); if (allowMutation(handle)) { - document::BucketSpace bucketSpace = cmd->getBucket().getBucketSpace(); _op = std::make_shared<TwoPhaseUpdateOperation>(*this, *this, *this, - _bucketSpaceRepo.get(bucketSpace), + _bucketSpaceRepo.get(bucket_space), std::move(cmd), getMetrics(), std::move(handle)); } else { sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics)); @@ -288,8 +305,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Update) } -IMPL_MSG_COMMAND_H(ExternalOperationHandler, Remove) -{ +bool ExternalOperationHandler::onRemove(const std::shared_ptr<api::RemoveCommand>& cmd) { auto& metrics = getMetrics().removes; if (!checkTimestampMutationPreconditions(*cmd, getBucketId(cmd->getDocumentId()), metrics)) { return true; @@ -298,9 +314,10 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Remove) if (cmd->getTimestamp() == 0) { cmd->setTimestamp(getUniqueTimestamp()); } - auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId()); + const auto bucket_space = cmd->getBucket().getBucketSpace(); + auto handle = _operation_sequencer.try_acquire(bucket_space, cmd->getDocumentId()); if (allowMutation(handle)) { - auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace())); + auto &distributorBucketSpace(_bucketSpaceRepo.get(bucket_space)); _op = std::make_shared<RemoveOperation>(*this, *this, distributorBucketSpace, std::move(cmd), getMetrics().removes, std::move(handle)); @@ -311,8 +328,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Remove) return true; } -IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation) -{ +bool ExternalOperationHandler::onRemoveLocation(const std::shared_ptr<api::RemoveLocationCommand>& cmd) { document::BucketId bid; RemoveLocationOperation::getBucketId(*this, *this, *cmd, bid); document::Bucket bucket(cmd->getBucket().getBucketSpace(), bid); @@ -357,14 +373,12 @@ std::shared_ptr<Operation> ExternalOperationHandler::try_generate_get_operation( desired_get_read_consistency()); } -IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get) -{ +bool ExternalOperationHandler::onGet(const std::shared_ptr<api::GetCommand>& cmd) { _op = try_generate_get_operation(cmd); return true; } -IMPL_MSG_COMMAND_H(ExternalOperationHandler, StatBucket) -{ +bool ExternalOperationHandler::onStatBucket(const std::shared_ptr<api::StatBucketCommand>& cmd) { auto& metrics = getMetrics().stats; bounce_or_invoke_read_only_op(*cmd, cmd->getBucket(), metrics, [&](auto& bucket_space_repo) { auto& bucket_space = bucket_space_repo.get(cmd->getBucket().getBucketSpace()); @@ -373,8 +387,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, StatBucket) return true; } -IMPL_MSG_COMMAND_H(ExternalOperationHandler, GetBucketList) -{ +bool ExternalOperationHandler::onGetBucketList(const std::shared_ptr<api::GetBucketListCommand>& cmd) { auto& metrics = getMetrics().getbucketlists; bounce_or_invoke_read_only_op(*cmd, cmd->getBucket(), metrics, [&](auto& bucket_space_repo) { auto& bucket_space = bucket_space_repo.get(cmd->getBucket().getBucketSpace()); @@ -384,13 +397,19 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, GetBucketList) return true; } -IMPL_MSG_COMMAND_H(ExternalOperationHandler, CreateVisitor) -{ +bool ExternalOperationHandler::onCreateVisitor(const std::shared_ptr<api::CreateVisitorCommand>& cmd) { // TODO same handling as Gets (VisitorOperation needs to change) const DistributorConfiguration& config(getDistributor().getConfig()); VisitorOperation::Config visitorConfig(config.getMinBucketsPerVisitor(), config.getMaxVisitorsPerNodePerClientVisitor()); auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace())); - _op = Operation::SP(new VisitorOperation(*this, *this, distributorBucketSpace, cmd, visitorConfig, getMetrics().visits)); + auto visit_op = std::make_shared<VisitorOperation>(*this, *this, distributorBucketSpace, cmd, visitorConfig, getMetrics().visits); + if (visit_op->is_read_for_write()) { + _op = std::make_shared<ReadForWriteVisitorOperationStarter>(std::move(visit_op), _operation_sequencer, + _distributor_operation_owner, + getDistributor().getPendingMessageTracker()); + } else { + _op = std::move(visit_op); + } return true; } diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h index 60cad15a791..00dc9a49d74 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.h +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h @@ -11,6 +11,8 @@ #include <chrono> #include <mutex> +namespace documentapi { class TestAndSetCondition; } + namespace storage { class PersistenceOperationMetricSet; @@ -21,6 +23,7 @@ class DistributorMetricSet; class Distributor; class MaintenanceOperationGenerator; class DirectDispatchSender; +class OperationOwner; class ExternalOperationHandler : public DistributorComponent, public api::MessageHandler @@ -29,19 +32,20 @@ public: using Clock = std::chrono::system_clock; using TimePoint = std::chrono::time_point<Clock>; - DEF_MSG_COMMAND_H(Get); - DEF_MSG_COMMAND_H(Put); - DEF_MSG_COMMAND_H(Update); - DEF_MSG_COMMAND_H(Remove); - DEF_MSG_COMMAND_H(RemoveLocation); - DEF_MSG_COMMAND_H(StatBucket); - DEF_MSG_COMMAND_H(CreateVisitor); - DEF_MSG_COMMAND_H(GetBucketList); + bool onGet(const std::shared_ptr<api::GetCommand>&) override; + bool onPut(const std::shared_ptr<api::PutCommand>&) override; + bool onUpdate(const std::shared_ptr<api::UpdateCommand>&) override; + bool onRemove(const std::shared_ptr<api::RemoveCommand>&) override; + bool onRemoveLocation(const std::shared_ptr<api::RemoveLocationCommand>&) override; + bool onStatBucket(const std::shared_ptr<api::StatBucketCommand>&) override; + bool onCreateVisitor(const std::shared_ptr<api::CreateVisitorCommand>&) override; + bool onGetBucketList(const std::shared_ptr<api::GetBucketListCommand>&) override; ExternalOperationHandler(Distributor& owner, DistributorBucketSpaceRepo& bucketSpaceRepo, DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, - const MaintenanceOperationGenerator&, + const MaintenanceOperationGenerator& gen, + OperationOwner& operation_owner, DistributorComponentRegister& compReg); ~ExternalOperationHandler() override; @@ -74,12 +78,18 @@ public: return _use_weak_internal_read_consistency_for_gets.load(std::memory_order_relaxed); } + // Exposed for testing + OperationSequencer& operation_sequencer() noexcept { + return _operation_sequencer; + } + private: std::unique_ptr<DirectDispatchSender> _direct_dispatch_sender; const MaintenanceOperationGenerator& _operationGenerator; - OperationSequencer _mutationSequencer; + OperationSequencer _operation_sequencer; Operation::SP _op; TimePoint _rejectFeedBeforeTimeReached; + OperationOwner& _distributor_operation_owner; mutable std::mutex _non_main_thread_ops_mutex; OperationOwner _non_main_thread_ops_owner; std::atomic<bool> _concurrent_gets_enabled; diff --git a/storage/src/vespa/storage/distributor/operation_sequencer.cpp b/storage/src/vespa/storage/distributor/operation_sequencer.cpp index d268028de78..aa48cb6c950 100644 --- a/storage/src/vespa/storage/distributor/operation_sequencer.cpp +++ b/storage/src/vespa/storage/distributor/operation_sequencer.cpp @@ -4,8 +4,7 @@ #include <vespa/document/base/documentid.h> #include <cassert> -namespace storage { -namespace distributor { +namespace storage::distributor { void SequencingHandle::release() { if (valid()) { @@ -14,27 +13,49 @@ void SequencingHandle::release() { } } -OperationSequencer::OperationSequencer() { -} - -OperationSequencer::~OperationSequencer() { -} +OperationSequencer::OperationSequencer() = default; +OperationSequencer::~OperationSequencer() = default; -SequencingHandle OperationSequencer::try_acquire(const document::DocumentId& id) { +SequencingHandle OperationSequencer::try_acquire(document::BucketSpace bucket_space, const document::DocumentId& id) { const document::GlobalId gid(id.getGlobalId()); + if (!_active_buckets.empty()) { + auto doc_bucket_id = gid.convertToBucketId(); + // TODO avoid O(n), but sub bucket resolving is tricky and we expect the number + // of locked buckets to be in the range of 0 to <very small number>. + for (const auto& entry : _active_buckets) { + if ((entry.getBucketSpace() == bucket_space) + && entry.getBucketId().contains(doc_bucket_id)) + { + return SequencingHandle(SequencingHandle::BlockedBy::LockedBucket); + } + } + } const auto inserted = _active_gids.insert(gid); if (inserted.second) { return SequencingHandle(*this, gid); } else { - return SequencingHandle(); + return SequencingHandle(SequencingHandle::BlockedBy::PendingOperation); + } +} + +SequencingHandle OperationSequencer::try_acquire(const document::Bucket& bucket) { + const auto inserted = _active_buckets.insert(bucket); + if (inserted.second) { + return SequencingHandle(*this, bucket); + } else { + return SequencingHandle(SequencingHandle::BlockedBy::LockedBucket); } } void OperationSequencer::release(const SequencingHandle& handle) { assert(handle.valid()); - _active_gids.erase(handle.gid()); + if (handle.has_gid()) { + _active_gids.erase(handle.gid()); + } else { + assert(handle.has_bucket()); + [[maybe_unused]] auto erased = _active_buckets.erase(handle.bucket()); + assert(erased == 1u); + } } -} // distributor -} // storage - +} // storage::distributor diff --git a/storage/src/vespa/storage/distributor/operation_sequencer.h b/storage/src/vespa/storage/distributor/operation_sequencer.h index 7a523b61e53..ff8df2cd039 100644 --- a/storage/src/vespa/storage/distributor/operation_sequencer.h +++ b/storage/src/vespa/storage/distributor/operation_sequencer.h @@ -2,8 +2,11 @@ #pragma once #include <vespa/document/base/globalid.h> +#include <vespa/document/bucket/bucket.h> #include <vespa/vespalib/stllike/hash_set.h> +#include <set> #include <utility> +#include <variant> namespace document { class DocumentId; @@ -15,19 +18,42 @@ class OperationSequencer; /** * Represents a move-only handle which effectively holds a guard for - * allowing sequenced operations towards a particular document ID. + * allowing sequenced operations towards a particular document ID or + * bucket ID. * * Destroying a handle will implicitly release the guard, allowing * new sequenced operations towards the ID. */ class SequencingHandle { +public: + enum class BlockedBy { + PendingOperation, + LockedBucket + }; +private: OperationSequencer* _sequencer; - document::GlobalId _gid; + using HandleVariant = std::variant<document::Bucket, document::GlobalId, BlockedBy>; + HandleVariant _handle; public: - SequencingHandle() noexcept : _sequencer(nullptr) {} - SequencingHandle(OperationSequencer& sequencer, const document::GlobalId& gid) - : _sequencer(&sequencer), - _gid(gid) + SequencingHandle() noexcept + : _sequencer(nullptr), + _handle() + {} + + explicit SequencingHandle(BlockedBy blocked_by) + : _sequencer(nullptr), + _handle(blocked_by) + {} + + SequencingHandle(OperationSequencer& sequencer, const document::GlobalId& gid) noexcept + : _sequencer(&sequencer), + _handle(gid) + { + } + + SequencingHandle(OperationSequencer& sequencer, const document::Bucket& bucket) + : _sequencer(&sequencer), + _handle(bucket) { } @@ -39,8 +65,8 @@ public: SequencingHandle& operator=(const SequencingHandle&) = delete; SequencingHandle(SequencingHandle&& rhs) noexcept - : _sequencer(rhs._sequencer), - _gid(rhs._gid) + : _sequencer(rhs._sequencer), + _handle(std::move(rhs._handle)) { rhs._sequencer = nullptr; } @@ -48,13 +74,30 @@ public: SequencingHandle& operator=(SequencingHandle&& rhs) noexcept { if (&rhs != this) { std::swap(_sequencer, rhs._sequencer); - std::swap(_gid, rhs._gid); + std::swap(_handle, rhs._handle); } return *this; } - bool valid() const noexcept { return (_sequencer != nullptr); } - const document::GlobalId& gid() const noexcept { return _gid; } + [[nodiscard]] bool valid() const noexcept { return (_sequencer != nullptr); } + [[nodiscard]] bool was_blocked() const noexcept { + return std::holds_alternative<BlockedBy>(_handle); + } + [[nodiscard]] BlockedBy blocked_by() const noexcept { + return std::get<BlockedBy>(_handle); // FIXME can actually throw + } + [[nodiscard]] bool has_bucket() const noexcept { + return std::holds_alternative<document::Bucket>(_handle); + } + const document::Bucket& bucket() const noexcept { + return std::get<document::Bucket>(_handle); // FIXME can actually throw + } + [[nodiscard]] bool has_gid() const noexcept { + return std::holds_alternative<document::GlobalId>(_handle); + } + const document::GlobalId& gid() const noexcept { + return std::get<document::GlobalId>(_handle); // FIXME can actually throw + } void release(); }; @@ -68,7 +111,10 @@ public: */ class OperationSequencer { using GidSet = vespalib::hash_set<document::GlobalId, document::GlobalId::hash>; - GidSet _active_gids; + using BucketSet = std::set<document::Bucket>; + + GidSet _active_gids; + BucketSet _active_buckets; friend class SequencingHandle; public: @@ -76,8 +122,11 @@ public: ~OperationSequencer(); // Returns a handle with valid() == true iff no concurrent operations are - // already active for `id`. - SequencingHandle try_acquire(const document::DocumentId& id); + // already active for `id` _and_ the there are no active bucket locks for + // any bucket that may contain `id`. + SequencingHandle try_acquire(document::BucketSpace bucket_space, const document::DocumentId& id); + + SequencingHandle try_acquire(const document::Bucket& bucket); private: void release(const SequencingHandle& handle); }; diff --git a/storage/src/vespa/storage/distributor/operationowner.h b/storage/src/vespa/storage/distributor/operationowner.h index 39881550466..37412b08d2e 100644 --- a/storage/src/vespa/storage/distributor/operationowner.h +++ b/storage/src/vespa/storage/distributor/operationowner.h @@ -59,7 +59,7 @@ public: : _sender(sender), _clock(clock) { } - ~OperationOwner(); + ~OperationOwner() override; /** Handles replies from storage, mapping from a message id to an operation. @@ -82,6 +82,8 @@ public: */ void erase(api::StorageMessage::Id msgId); + [[nodiscard]] DistributorMessageSender& sender() noexcept { return _sender; } + void onClose(); uint32_t size() const { return _sentMessageMap.size(); } std::string toString() const; diff --git a/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt b/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt index bb2992034f7..57a5a6cdde5 100644 --- a/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt @@ -4,6 +4,7 @@ vespa_add_library(storage_distributoroperationexternal OBJECT getoperation.cpp newest_replica.cpp putoperation.cpp + read_for_write_visitor_operation.cpp removelocationoperation.cpp removeoperation.cpp statbucketlistoperation.cpp diff --git a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp new file mode 100644 index 00000000000..1e87172e870 --- /dev/null +++ b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp @@ -0,0 +1,74 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "read_for_write_visitor_operation.h" +#include "visitoroperation.h" +#include <vespa/storage/distributor/pendingmessagetracker.h> +#include <vespa/storage/distributor/operationowner.h> +#include <cassert> + +#include <vespa/log/log.h> +LOG_SETUP(".operations.external.read_for_write_visitor_operation"); + +namespace storage::distributor { + +ReadForWriteVisitorOperationStarter::ReadForWriteVisitorOperationStarter( + std::shared_ptr<VisitorOperation> visitor_op, + OperationSequencer& operation_sequencer, + OperationOwner& stable_operation_owner, + PendingMessageTracker& message_tracker) + : _visitor_op(std::move(visitor_op)), + _operation_sequencer(operation_sequencer), + _stable_operation_owner(stable_operation_owner), + _message_tracker(message_tracker) +{ +} + +ReadForWriteVisitorOperationStarter::~ReadForWriteVisitorOperationStarter() = default; + +void ReadForWriteVisitorOperationStarter::onClose(DistributorMessageSender& sender) { + _visitor_op->onClose(sender); +} + +void ReadForWriteVisitorOperationStarter::onStart(DistributorMessageSender& sender) { + if (_visitor_op->verify_command_and_expand_buckets(sender)) { + assert(!_visitor_op->has_sent_reply()); + auto maybe_bucket = _visitor_op->first_bucket_to_visit(); + if (!maybe_bucket) { + LOG(debug, "No buckets found to visit, tagging visitor complete"); + // No buckets to be found, start op to trigger immediate reply. + _visitor_op->start(sender, _startTime); + assert(_visitor_op->has_sent_reply()); + return; + } + auto bucket_handle = _operation_sequencer.try_acquire(*maybe_bucket); + if (!bucket_handle.valid()) { + LOG(debug, "An operation is already pending for bucket %s, failing visitor", + maybe_bucket->toString().c_str()); + _visitor_op->fail_with_bucket_already_locked(sender); + return; + } + LOG(debug, "Possibly deferring start of visitor for bucket %s", maybe_bucket->toString().c_str()); + _message_tracker.run_once_no_pending_for_bucket( + *maybe_bucket, + make_deferred_task([self = shared_from_this(), handle = std::move(bucket_handle)](TaskRunState state) mutable { + LOG(debug, "Starting deferred visitor"); + self->_visitor_op->assign_bucket_lock_handle(std::move(handle)); + if (state == TaskRunState::OK) { + // Once started, ownership of _visitor_op will pass to the Distributor's OperationOwner + self->_stable_operation_owner.start(self->_visitor_op, 120/*TODO*/); + } else { + self->_visitor_op->onClose(self->_stable_operation_owner.sender()); + } + })); + } else { + LOG(debug, "Failed verification of visitor, responding immediately"); + assert(_visitor_op->has_sent_reply()); + } +} + +void ReadForWriteVisitorOperationStarter::onReceive(DistributorMessageSender& sender, + const std::shared_ptr<api::StorageReply> & msg) { + _visitor_op->onReceive(sender, msg); +} + +} diff --git a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h new file mode 100644 index 00000000000..06b4f60307e --- /dev/null +++ b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h @@ -0,0 +1,48 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/storage/distributor/operations/operation.h> +#include <vespa/storage/distributor/operation_sequencer.h> +#include <memory> + +namespace storage::distributor { + +class PendingMessageTracker; +class VisitorOperation; +class OperationOwner; + +/** + * Operation starting indirection for a visitor operation that has the semantics + * of an exclusive bucket lock. Such operations can only resolve to a single + * super-bucket/sub-bucket pair and care should be taken to avoid starving client + * operations through long-running locks. + * + * Operation starting may be deferred to the PendingMessageTracker if there are + * pending operations to the sub-bucket when onStart is called. If so, the deferred + * operation start takes place automatically and immediately when all pending + * bucket operations have completed. These will be started in the context of the + * OperationOwner provided to the operation. + */ +class ReadForWriteVisitorOperationStarter + : public Operation, + public std::enable_shared_from_this<ReadForWriteVisitorOperationStarter> +{ + std::shared_ptr<VisitorOperation> _visitor_op; + OperationSequencer& _operation_sequencer; + OperationOwner& _stable_operation_owner; + PendingMessageTracker& _message_tracker; +public: + ReadForWriteVisitorOperationStarter(std::shared_ptr<VisitorOperation> visitor_op, + OperationSequencer& operation_sequencer, + OperationOwner& stable_operation_owner, + PendingMessageTracker& message_tracker); + ~ReadForWriteVisitorOperationStarter() override; + + const char* getName() const override { return "ReadForWriteVisitorOperationStarter"; } + void onClose(DistributorMessageSender& sender) override; + void onStart(DistributorMessageSender& sender) override; + void onReceive(DistributorMessageSender& sender, + const std::shared_ptr<api::StorageReply> & msg) override; +}; + +} diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp index c35a6671c8d..cf6922b8606 100644 --- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp @@ -43,6 +43,24 @@ VisitorOperation::BucketInfo::toString() const VisitorOperation::SuperBucketInfo::~SuperBucketInfo() = default; +namespace { + +[[nodiscard]] bool +matches_visitor_library(vespalib::stringref input, vespalib::stringref expected) +{ + if (input.size() != expected.size()) { + return false; + } + for (size_t i = 0; i < input.size(); ++i) { + if (static_cast<char>(std::tolower(static_cast<unsigned char>(input[i]))) != expected[i]) { + return false; + } + } + return true; +} + +} + VisitorOperation::VisitorOperation( DistributorNodeContext& node_ctx, DistributorOperationContext& op_ctx, @@ -55,11 +73,14 @@ VisitorOperation::VisitorOperation( _op_ctx(op_ctx), _bucketSpace(bucketSpace), _msg(m), - _sentReply(false), _config(config), _metrics(metrics), _trace(TRACE_SOFT_MEMORY_LIMIT), - _operationTimer(_node_ctx.clock()) + _operationTimer(_node_ctx.clock()), + _bucket_lock(), // Initially no lock is held + _sentReply(false), + _verified_and_expanded(false), + _is_read_for_write(matches_visitor_library(_msg->getLibraryName(), "reindexingvisitor")) { const std::vector<document::BucketId>& buckets = m->getBuckets(); @@ -245,7 +266,7 @@ VisitorOperation::verifyVisitorDistributionBitCount( const document::BucketId& bid) { const lib::ClusterState& clusterState = _bucketSpace.getClusterState(); - if (_msg->getDocumentSelection().length() == 0 + if (_msg->getDocumentSelection().empty() && bid.getUsedBits() != clusterState.getDistributionBitCount()) { LOG(debug, @@ -332,6 +353,13 @@ VisitorOperation::verifyCreateVisitorCommand(DistributorMessageSender& sender) verifyOperationContainsBuckets(); verifyOperationHasSuperbucketAndProgress(); verifyOperationSentToCorrectDistributor(); + // TODO wrap and test + if (is_read_for_write() && (_msg->getMaxBucketsPerVisitor() != 1)) { + throw VisitorVerificationException( + api::ReturnCode::ILLEGAL_PARAMETERS, + vespalib::make_string("Read-for-write visitors can only have 1 max pending bucket, was %u", + _msg->getMaxBucketsPerVisitor())); + } return true; } catch (const VisitorVerificationException& e) { LOG(debug, @@ -382,14 +410,6 @@ VisitorOperation::pickBucketsToVisit(const std::vector<BucketDatabase::Entry>& b } bool -VisitorOperation::expandBucketAll() -{ - std::vector<BucketDatabase::Entry> entries; - _bucketSpace.getBucketDatabase().getAll(_superBucket.bid, entries); - return pickBucketsToVisit(entries); -} - -bool VisitorOperation::expandBucketContaining() { std::vector<BucketDatabase::Entry> entries; @@ -568,13 +588,24 @@ VisitorOperation::pickTargetNode( void VisitorOperation::onStart(DistributorMessageSender& sender) { - if (!verifyCreateVisitorCommand(sender)) { - return; + if (!_verified_and_expanded) { + if (!verify_command_and_expand_buckets(sender)) { + return; + } } + startNewVisitors(sender); +} +bool +VisitorOperation::verify_command_and_expand_buckets(DistributorMessageSender& sender) +{ + assert(!_verified_and_expanded); + _verified_and_expanded = true; + if (!verifyCreateVisitorCommand(sender)) { + return false; + } expandBucket(); - - startNewVisitors(sender); + return true; } bool @@ -770,6 +801,7 @@ VisitorOperation::sendStorageVisitor(uint16_t node, uint32_t pending, DistributorMessageSender& sender) { + // TODO rewrite to not use copy ctor and remove wonky StorageCommand copy ctor impl auto cmd = std::make_shared<api::CreateVisitorCommand>(*_msg); cmd->getBuckets() = buckets; @@ -848,4 +880,26 @@ VisitorOperation::onClose(DistributorMessageSender& sender) sendReply(api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"), sender); } +void +VisitorOperation::fail_with_bucket_already_locked(DistributorMessageSender& sender) +{ + assert(is_read_for_write()); + sendReply(api::ReturnCode(api::ReturnCode::BUSY, "This bucket is already locked by another operation"), sender); +} + +std::optional<document::Bucket> +VisitorOperation::first_bucket_to_visit() const +{ + if (_superBucket.subBuckets.empty()) { + return {}; + } + return {document::Bucket(_msg->getBucketSpace(), _superBucket.subBuckets.begin()->first)}; +} + +void +VisitorOperation::assign_bucket_lock_handle(SequencingHandle handle) +{ + _bucket_lock = std::move(handle); +} + } diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h index 28be7dfe353..12164e3cba9 100644 --- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h @@ -2,12 +2,14 @@ #pragma once #include <vespa/storage/distributor/operations/operation.h> +#include <vespa/storage/distributor/operation_sequencer.h> #include <vespa/storage/bucketdb/bucketdatabase.h> #include <vespa/storage/visiting/memory_bounded_trace.h> #include <vespa/storageapi/defs.h> #include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/storageapi/message/visitor.h> #include <vespa/storageframework/generic/clock/timer.h> +#include <optional> namespace document { class Document; } @@ -36,7 +38,7 @@ public: VisitorOperation(DistributorNodeContext& node_ctx, DistributorOperationContext& op_ctx, DistributorBucketSpace &bucketSpace, - const std::shared_ptr<api::CreateVisitorCommand> & msg, + const std::shared_ptr<api::CreateVisitorCommand>& msg, const Config& config, VisitorMetricSet& metrics); @@ -47,9 +49,19 @@ public: void onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override; + // Only valid to call if is_read_for_write() == true + void fail_with_bucket_already_locked(DistributorMessageSender& sender); + + [[nodiscard]] bool verify_command_and_expand_buckets(DistributorMessageSender& sender); + const char* getName() const override { return "visit"; } std::string getStatus() const override { return ""; } + [[nodiscard]] bool has_sent_reply() const noexcept { return _sentReply; } + [[nodiscard]] bool is_read_for_write() const noexcept { return _is_read_for_write; } + [[nodiscard]] std::optional<document::Bucket> first_bucket_to_visit() const; + void assign_bucket_lock_handle(SequencingHandle handle); + private: struct BucketInfo { bool done; @@ -63,7 +75,7 @@ private: vespalib::string toString() const; }; - typedef std::map<document::BucketId, BucketInfo> VisitBucketMap; + using VisitBucketMap = std::map<document::BucketId, BucketInfo>; struct SuperBucketInfo { document::BucketId bid; @@ -71,7 +83,7 @@ private: VisitBucketMap subBuckets; std::vector<document::BucketId> subBucketsVisitOrder; - SuperBucketInfo(const document::BucketId& b = document::BucketId(0)) + explicit SuperBucketInfo(const document::BucketId& b = document::BucketId(0)) : bid(b), subBucketsCompletelyExpanded(false) { @@ -80,8 +92,8 @@ private: }; - typedef std::map<uint16_t, std::vector<document::BucketId> > NodeToBucketsMap; - typedef std::map<uint64_t, api::CreateVisitorCommand::SP> SentMessagesMap; + using NodeToBucketsMap = std::map<uint16_t, std::vector<document::BucketId>>; + using SentMessagesMap = std::map<uint64_t, api::CreateVisitorCommand::SP>; void sendReply(const api::ReturnCode& code, DistributorMessageSender& sender); void updateReplyMetrics(const api::ReturnCode& result); @@ -94,15 +106,12 @@ private: void verifyOperationSentToCorrectDistributor(); bool verifyCreateVisitorCommand(DistributorMessageSender& sender); bool pickBucketsToVisit(const std::vector<BucketDatabase::Entry>& buckets); - bool expandBucketAll(); bool expandBucketContaining(); bool expandBucketContained(); void expandBucket(); int pickTargetNode( const BucketDatabase::Entry& entry, const std::vector<uint16_t>& triedNodes); - void attemptToParseOrderingSelector(); - bool documentSelectionMayHaveOrdering() const; bool maySendNewStorageVisitors() const noexcept; void startNewVisitors(DistributorMessageSender& sender); void initializeActiveNodes(); @@ -148,7 +157,6 @@ private: api::CreateVisitorCommand::SP _msg; api::ReturnCode _storageError; - bool _sentReply; SuperBucketInfo _superBucket; document::BucketId _lastBucket; @@ -167,11 +175,13 @@ private: static constexpr size_t TRACE_SOFT_MEMORY_LIMIT = 65536; - bool done(); - bool hasNoPendingMessages(); document::BucketId getLastBucketVisited(); mbus::TraceNode trace; framework::MilliSecTimer _operationTimer; + SequencingHandle _bucket_lock; + bool _sentReply; + bool _verified_and_expanded; + bool _is_read_for_write; }; } diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp index 715ac1722b4..8027b5349f9 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp @@ -71,7 +71,7 @@ pairAsRange(Pair pair) std::vector<uint64_t> PendingMessageTracker::clearMessagesForNode(uint16_t node) { - std::lock_guard<std::mutex> guard(_lock); + std::lock_guard guard(_lock); MessagesByNodeAndBucket& idx(boost::multi_index::get<1>(_messages)); auto range = pairAsRange(idx.equal_range(boost::make_tuple(node))); @@ -88,7 +88,7 @@ PendingMessageTracker::clearMessagesForNode(uint16_t node) void PendingMessageTracker::insert(const std::shared_ptr<api::StorageMessage>& msg) { - std::lock_guard<std::mutex> guard(_lock); + std::lock_guard guard(_lock); if (msg->getAddress()) { _messages.emplace(currentTime(), msg->getType().getId(), msg->getPriority(), msg->getMsgId(), msg->getBucket(), msg->getAddress()->getIndex()); @@ -103,7 +103,7 @@ PendingMessageTracker::insert(const std::shared_ptr<api::StorageMessage>& msg) document::Bucket PendingMessageTracker::reply(const api::StorageReply& r) { - std::lock_guard<std::mutex> guard(_lock); + std::unique_lock guard(_lock); document::Bucket bucket; LOG(debug, "Got reply: %s", r.toString().c_str()); @@ -121,6 +121,16 @@ PendingMessageTracker::reply(const api::StorageReply& r) } LOG(debug, "Erased message with id %" PRIu64, msgId); msgs.erase(msgId); + auto deferred_tasks = get_deferred_ops_if_bucket_pending_drained(bucket); + // Deferred tasks may try to send messages, which in turn will invoke the PendingMessageTracker. + // To avoid deadlocking, we run the tasks outside the lock. + // TODO remove locking entirely... Only exists for status pages! + guard.unlock(); + // We expect this to be "effectively noexcept", i.e. any tasks throwing an + // exception will end up nuking the distributor process from the unwind. + for (auto& task : deferred_tasks) { + task->run(TaskRunState::OK); + } } return bucket; @@ -129,6 +139,57 @@ PendingMessageTracker::reply(const api::StorageReply& r) namespace { template <typename Range> +bool is_empty_range(const Range& range) noexcept { + return (range.first == range.second); +} + +} + +std::vector<std::unique_ptr<DeferredTask>> +PendingMessageTracker::get_deferred_ops_if_bucket_pending_drained(const document::Bucket& bucket) +{ + if (_deferred_bucket_tasks.empty()) { + return {}; + } + std::vector<std::unique_ptr<DeferredTask>> tasks; + auto& bucket_idx = boost::multi_index::get<2>(_messages); + auto pending_tasks_for_bucket = bucket_idx.equal_range(bucket); + if (is_empty_range(pending_tasks_for_bucket)) { + auto waiting_tasks = _deferred_bucket_tasks.equal_range(bucket); + for (auto task_iter = waiting_tasks.first; task_iter != waiting_tasks.second; ++task_iter) { + tasks.emplace_back(std::move(task_iter->second)); + } + _deferred_bucket_tasks.erase(waiting_tasks.first, waiting_tasks.second); + } + return tasks; +} + +void +PendingMessageTracker::run_once_no_pending_for_bucket(const document::Bucket& bucket, std::unique_ptr<DeferredTask> task) +{ + std::unique_lock guard(_lock); + auto& bucket_idx = boost::multi_index::get<2>(_messages); + const auto pending_tasks_for_bucket = bucket_idx.equal_range(bucket); + if (is_empty_range(pending_tasks_for_bucket)) { + guard.unlock(); // Must not be held whilst running task, or else recursive sends will deadlock. + task->run(TaskRunState::OK); // Nothing pending, run immediately. + } else { + _deferred_bucket_tasks.emplace(bucket, std::move(task)); + } +} + +void +PendingMessageTracker::abort_deferred_tasks() +{ + std::lock_guard guard(_lock); + for (auto& task : _deferred_bucket_tasks) { + task.second->run(TaskRunState::Aborted); + } +} + +namespace { + +template <typename Range> void runCheckerOnRange(PendingMessageTracker::Checker& checker, const Range& range) { @@ -144,7 +205,7 @@ runCheckerOnRange(PendingMessageTracker::Checker& checker, const Range& range) void PendingMessageTracker::checkPendingMessages(uint16_t node, const document::Bucket &bucket, Checker& checker) const { - std::lock_guard<std::mutex> guard(_lock); + std::lock_guard guard(_lock); const MessagesByNodeAndBucket& msgs(boost::multi_index::get<1>(_messages)); auto range = pairAsRange(msgs.equal_range(boost::make_tuple(node, bucket))); @@ -154,7 +215,7 @@ PendingMessageTracker::checkPendingMessages(uint16_t node, const document::Bucke void PendingMessageTracker::checkPendingMessages(const document::Bucket &bucket, Checker& checker) const { - std::lock_guard<std::mutex> guard(_lock); + std::lock_guard guard(_lock); const MessagesByBucketAndType& msgs(boost::multi_index::get<2>(_messages)); auto range = pairAsRange(msgs.equal_range(boost::make_tuple(bucket))); @@ -164,7 +225,7 @@ PendingMessageTracker::checkPendingMessages(const document::Bucket &bucket, Chec bool PendingMessageTracker::hasPendingMessage(uint16_t node, const document::Bucket &bucket, uint32_t messageType) const { - std::lock_guard<std::mutex> guard(_lock); + std::lock_guard guard(_lock); const MessagesByNodeAndBucket& msgs(boost::multi_index::get<1>(_messages)); auto range = msgs.equal_range(boost::make_tuple(node, bucket, messageType)); @@ -181,7 +242,7 @@ PendingMessageTracker::getStatusStartPage(std::ostream& out) const void PendingMessageTracker::getStatusPerBucket(std::ostream& out) const { - std::lock_guard<std::mutex> guard(_lock); + std::lock_guard guard(_lock); const MessagesByNodeAndBucket& msgs = boost::multi_index::get<1>(_messages); using BucketMap = std::map<document::Bucket, std::vector<vespalib::string>>; BucketMap perBucketMsgs; @@ -210,7 +271,7 @@ PendingMessageTracker::getStatusPerBucket(std::ostream& out) const void PendingMessageTracker::getStatusPerNode(std::ostream& out) const { - std::lock_guard<std::mutex> guard(_lock); + std::lock_guard guard(_lock); const MessagesByNodeAndBucket& msgs = boost::multi_index::get<1>(_messages); int lastNode = -1; for (const auto & node : msgs) { diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h index 2dba244dc9f..5756c1ef8ab 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h @@ -15,7 +15,6 @@ #include <boost/multi_index/ordered_index.hpp> #include <boost/multi_index/sequenced_index.hpp> #include <boost/multi_index/composite_key.hpp> - #include <set> #include <unordered_map> #include <chrono> @@ -23,14 +22,41 @@ namespace storage::distributor { -class PendingMessageTracker : public framework::HtmlStatusReporter -{ +enum class TaskRunState { + OK, + Aborted, + BucketLost +}; + +struct DeferredTask { + virtual ~DeferredTask() = default; + virtual void run(TaskRunState state) = 0; +}; +template <typename Func> +class LambdaDeferredTask : public DeferredTask { + Func _func; +public: + explicit LambdaDeferredTask(Func&& f) : _func(std::move(f)) {} + LambdaDeferredTask(const LambdaDeferredTask&) = delete; + LambdaDeferredTask(LambdaDeferredTask&&) = delete; + ~LambdaDeferredTask() override = default; + + void run(TaskRunState state) override { + _func(state); + } +}; + +template <typename Func> +std::unique_ptr<DeferredTask> make_deferred_task(Func&& f) { + return std::make_unique<LambdaDeferredTask<std::decay_t<Func>>>(std::forward<Func>(f)); +} + +class PendingMessageTracker : public framework::HtmlStatusReporter { public: class Checker { public: - virtual ~Checker() {} - + virtual ~Checker() = default; virtual bool check(uint32_t messageType, uint16_t node, uint8_t priority) = 0; }; @@ -42,8 +68,8 @@ public: */ using TimePoint = std::chrono::milliseconds; - PendingMessageTracker(framework::ComponentRegister&); - ~PendingMessageTracker(); + explicit PendingMessageTracker(framework::ComponentRegister&); + ~PendingMessageTracker() override; void insert(const std::shared_ptr<api::StorageMessage>&); document::Bucket reply(const api::StorageReply& reply); @@ -89,6 +115,8 @@ public: _nodeBusyDuration = secs; } + void run_once_no_pending_for_bucket(const document::Bucket& bucket, std::unique_ptr<DeferredTask> task); + void abort_deferred_tasks(); private: struct MessageEntry { TimePoint timeStamp; @@ -130,23 +158,29 @@ private: { }; - typedef boost::multi_index::multi_index_container < + using Messages = boost::multi_index::multi_index_container < MessageEntry, boost::multi_index::indexed_by< boost::multi_index::ordered_unique<MessageIdKey>, boost::multi_index::ordered_non_unique<CompositeNodeBucketKey>, boost::multi_index::ordered_non_unique<CompositeBucketMsgNodeKey> > - > Messages; - - typedef Messages::nth_index<0>::type MessagesByMsgId; - typedef Messages::nth_index<1>::type MessagesByNodeAndBucket; - typedef Messages::nth_index<2>::type MessagesByBucketAndType; - - Messages _messages; - framework::Component _component; - NodeInfo _nodeInfo; - std::chrono::seconds _nodeBusyDuration; + >; + + using MessagesByMsgId = Messages::nth_index<0>::type; + using MessagesByNodeAndBucket = Messages::nth_index<1>::type; + using MessagesByBucketAndType = Messages::nth_index<2>::type; + using DeferredBucketTaskMap = std::unordered_multimap< + document::Bucket, + std::unique_ptr<DeferredTask>, + document::Bucket::hash + >; + + Messages _messages; + framework::Component _component; + NodeInfo _nodeInfo; + std::chrono::seconds _nodeBusyDuration; + DeferredBucketTaskMap _deferred_bucket_tasks; // Since distributor is currently single-threaded, this will only // contend when status page is being accessed. It is, however, required @@ -169,6 +203,8 @@ private: void getStatusPerNode(std::ostream& out) const; void getStatusPerBucket(std::ostream& out) const; TimePoint currentTime() const; + + std::vector<std::unique_ptr<DeferredTask>> get_deferred_ops_if_bucket_pending_drained(const document::Bucket&); }; } diff --git a/storage/src/vespa/storage/visiting/CMakeLists.txt b/storage/src/vespa/storage/visiting/CMakeLists.txt index f8999cd7dcb..27c8b8853b8 100644 --- a/storage/src/vespa/storage/visiting/CMakeLists.txt +++ b/storage/src/vespa/storage/visiting/CMakeLists.txt @@ -6,6 +6,7 @@ vespa_add_library(storage_visitor OBJECT dumpvisitorsingle.cpp memory_bounded_trace.cpp recoveryvisitor.cpp + reindexing_visitor.cpp testvisitor.cpp visitor.cpp visitormanager.cpp diff --git a/storage/src/vespa/storage/visiting/dumpvisitorsingle.h b/storage/src/vespa/storage/visiting/dumpvisitorsingle.h index 4cc538d6fd7..ccfc8a5a3cf 100644 --- a/storage/src/vespa/storage/visiting/dumpvisitorsingle.h +++ b/storage/src/vespa/storage/visiting/dumpvisitorsingle.h @@ -26,11 +26,10 @@ struct DumpVisitorSingleFactory : public VisitorFactory { VisitorEnvironment::UP makeVisitorEnvironment(StorageComponent&) override { - return VisitorEnvironment::UP(new VisitorEnvironment); + return std::make_unique<VisitorEnvironment>(); }; Visitor* - makeVisitor(StorageComponent& c, VisitorEnvironment&, const vdslib::Parameters& params) override { return new DumpVisitorSingle(c, params); } diff --git a/storage/src/vespa/storage/visiting/reindexing_visitor.cpp b/storage/src/vespa/storage/visiting/reindexing_visitor.cpp new file mode 100644 index 00000000000..8d2b4736582 --- /dev/null +++ b/storage/src/vespa/storage/visiting/reindexing_visitor.cpp @@ -0,0 +1,36 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "reindexing_visitor.h" +#include <vespa/document/fieldvalue/document.h> +#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h> +#include <vespa/storage/common/reindexing_constants.h> + +#include <vespa/log/log.h> +LOG_SETUP(".visitor.instance.reindexing_visitor"); + +namespace storage { + +ReindexingVisitor::ReindexingVisitor(StorageComponent& component) + : Visitor(component) +{ +} + +void ReindexingVisitor::handleDocuments(const document::BucketId& /*bucketId*/, + std::vector<spi::DocEntry::UP>& entries, + HitCounter& hitCounter) +{ + LOG(debug, "Visitor %s handling block of %zu documents.", _id.c_str(), entries.size()); + for (auto& entry : entries) { + if (entry->isRemove()) { + // We don't reindex removed documents, as that would be very silly. + continue; + } + const uint32_t doc_size = entry->getDocumentSize(); + hitCounter.addHit(*entry->getDocumentId(), doc_size); + auto msg = std::make_unique<documentapi::PutDocumentMessage>(entry->releaseDocument()); + msg->setApproxSize(doc_size); + msg->setCondition(documentapi::TestAndSetCondition(reindexing_bucket_lock_bypass_value())); + sendMessage(std::move(msg)); + } +} + +} diff --git a/storage/src/vespa/storage/visiting/reindexing_visitor.h b/storage/src/vespa/storage/visiting/reindexing_visitor.h new file mode 100644 index 00000000000..1d2e9504ec9 --- /dev/null +++ b/storage/src/vespa/storage/visiting/reindexing_visitor.h @@ -0,0 +1,27 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "visitor.h" + +namespace storage { + +class ReindexingVisitor : public Visitor { +public: + explicit ReindexingVisitor(StorageComponent& component); + ~ReindexingVisitor() override = default; + +private: + void handleDocuments(const document::BucketId&, std::vector<spi::DocEntry::UP>&, HitCounter&) override; +}; + +struct ReindexingVisitorFactory : public VisitorFactory { + VisitorEnvironment::UP makeVisitorEnvironment(StorageComponent&) override { + return std::make_unique<VisitorEnvironment>(); + }; + + Visitor* makeVisitor(StorageComponent& c, VisitorEnvironment&, const vdslib::Parameters&) override { + return new ReindexingVisitor(c); + } +}; + +} diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp index 7c138afeb0f..bc77fd268d5 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.cpp +++ b/storage/src/vespa/storage/visiting/visitormanager.cpp @@ -6,6 +6,7 @@ #include "countvisitor.h" #include "testvisitor.h" #include "recoveryvisitor.h" +#include "reindexing_visitor.h" #include <vespa/storage/common/statusmessages.h> #include <vespa/config/common/exceptions.h> #include <vespa/vespalib/util/stringfmt.h> @@ -55,6 +56,7 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri, _visitorFactories["testvisitor"] = std::make_shared<TestVisitorFactory>(); _visitorFactories["countvisitor"] = std::make_shared<CountVisitorFactory>(); _visitorFactories["recoveryvisitor"] = std::make_shared<RecoveryVisitorFactory>(); + _visitorFactories["reindexingvisitor"] = std::make_shared<ReindexingVisitorFactory>(); _component.registerStatusPage(*this); } |