From aa28f51fd49151f75934bf5e710c5c0ebae2ab8d Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Fri, 11 Dec 2020 14:08:38 +0000 Subject: Only let reindexing puts through locked bucket if their token matches that of the lock Avoids race condition edge case where reindexing puts from an outdated visitor may pass through a bucket lock intended for a newly created visitor operation Tokens are 128-bit values derived from a CSPRNG, so uniqueness is for all intents and purposes guaranteed. --- .../distributor/externaloperationhandlertest.cpp | 24 +++++++--- .../tests/distributor/idealstatemanagertest.cpp | 2 +- .../src/tests/distributor/mergeoperationtest.cpp | 2 +- .../tests/distributor/operation_sequencer_test.cpp | 17 ++++--- .../read_for_write_visitor_operation_test.cpp | 30 ++++++++++++- storage/src/tests/distributor/splitbuckettest.cpp | 2 +- .../src/tests/distributor/visitoroperationtest.cpp | 18 ++++++++ .../vespa/storage/common/reindexing_constants.cpp | 6 ++- .../vespa/storage/common/reindexing_constants.h | 4 +- .../src/vespa/storage/distributor/CMakeLists.txt | 1 + .../storage/distributor/crypto_uuid_generator.cpp | 20 +++++++++ .../storage/distributor/crypto_uuid_generator.h | 18 ++++++++ .../distributor/externaloperationhandler.cpp | 34 +++++++++++--- .../storage/distributor/externaloperationhandler.h | 2 + .../storage/distributor/operation_sequencer.cpp | 19 ++++---- .../storage/distributor/operation_sequencer.h | 52 +++++++++++++++------- .../external/read_for_write_visitor_operation.cpp | 14 ++++-- .../external/read_for_write_visitor_operation.h | 5 ++- .../operations/external/visitoroperation.cpp | 11 +++++ .../operations/external/visitoroperation.h | 2 + .../src/vespa/storage/distributor/uuid_generator.h | 19 ++++++++ .../vespa/storage/visiting/reindexing_visitor.cpp | 14 +++++- .../vespa/storage/visiting/reindexing_visitor.h | 1 + storage/src/vespa/storage/visiting/visitor.cpp | 8 +++- storage/src/vespa/storage/visiting/visitor.h | 6 ++- 25 files changed, 268 insertions(+), 63 deletions(-) create mode 100644 storage/src/vespa/storage/distributor/crypto_uuid_generator.cpp create mode 100644 storage/src/vespa/storage/distributor/crypto_uuid_generator.h create mode 100644 storage/src/vespa/storage/distributor/uuid_generator.h (limited to 'storage') diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index b230781a90c..520e5c31cfb 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -569,15 +569,16 @@ struct OperationHandlerSequencingTest : ExternalOperationHandlerTest { set_up_distributor_for_sequencing_test(); } - static documentapi::TestAndSetCondition bucket_lock_bypass_tas_condition() { - return documentapi::TestAndSetCondition(reindexing_bucket_lock_bypass_value()); + static documentapi::TestAndSetCondition bucket_lock_bypass_tas_condition(const vespalib::string& token) { + return documentapi::TestAndSetCondition( + vespalib::make_string("%s=%s", reindexing_bucket_lock_bypass_prefix(), token.c_str())); } }; TEST_F(OperationHandlerSequencingTest, put_not_allowed_through_locked_bucket_if_special_tas_token_not_present) { auto put = makePutCommand("testdoctype1", "id:foo:testdoctype1:n=1:bar"); auto bucket = makeDocumentBucket(document::BucketId(16, 1)); - auto bucket_handle = getExternalOperationHandler().operation_sequencer().try_acquire(bucket); + auto bucket_handle = getExternalOperationHandler().operation_sequencer().try_acquire(bucket, "foo"); ASSERT_TRUE(bucket_handle.valid()); ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(put)); } @@ -586,19 +587,28 @@ TEST_F(OperationHandlerSequencingTest, put_allowed_through_locked_bucket_if_spec set_up_distributor_for_sequencing_test(); auto put = makePutCommand("testdoctype1", "id:foo:testdoctype1:n=1:bar"); - put->setCondition(bucket_lock_bypass_tas_condition()); + put->setCondition(bucket_lock_bypass_tas_condition("foo")); auto bucket = makeDocumentBucket(document::BucketId(16, 1)); - auto bucket_handle = getExternalOperationHandler().operation_sequencer().try_acquire(bucket); + auto bucket_handle = getExternalOperationHandler().operation_sequencer().try_acquire(bucket, "foo"); ASSERT_TRUE(bucket_handle.valid()); Operation::SP op; ASSERT_NO_FATAL_FAILURE(start_operation_verify_not_rejected(put, op)); } +TEST_F(OperationHandlerSequencingTest, put_not_allowed_through_locked_bucket_if_tas_token_mismatches_current_lock_tkoen) { + auto put = makePutCommand("testdoctype1", "id:foo:testdoctype1:n=1:bar"); + put->setCondition(bucket_lock_bypass_tas_condition("bar")); + auto bucket = makeDocumentBucket(document::BucketId(16, 1)); + auto bucket_handle = getExternalOperationHandler().operation_sequencer().try_acquire(bucket, "foo"); + ASSERT_TRUE(bucket_handle.valid()); + ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(put)); +} + TEST_F(OperationHandlerSequencingTest, put_with_bucket_lock_tas_token_is_rejected_if_no_bucket_lock_present) { auto put = makePutCommand("testdoctype1", "id:foo:testdoctype1:n=1:bar"); - put->setCondition(bucket_lock_bypass_tas_condition()); + put->setCondition(bucket_lock_bypass_tas_condition("foo")); ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(put)); // TODO determine most appropriate error code here. Want to fail the bucket but // not the entire visitor operation. Will likely need to be revisited (heh!) soon. @@ -611,7 +621,7 @@ TEST_F(OperationHandlerSequencingTest, put_with_bucket_lock_tas_token_is_rejecte // present, this tests the case where a lock is present but it's not a bucket-level lock. TEST_F(OperationHandlerSequencingTest, put_with_bucket_lock_tas_token_is_rejected_if_document_lock_present) { auto put = makePutCommand("testdoctype1", _dummy_id); - put->setCondition(bucket_lock_bypass_tas_condition()); + put->setCondition(bucket_lock_bypass_tas_condition("foo")); Operation::SP op; ASSERT_NO_FATAL_FAILURE(start_operation_verify_not_rejected(makeUpdateCommand("testdoctype1", _dummy_id), op)); ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(std::move(put))); diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index d5374907723..398c4c70ef6 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -255,7 +255,7 @@ TEST_F(IdealStateManagerTest, block_operations_with_locked_buckets) { msg->setAddress(api::StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); tracker.insert(msg); } - auto token = op_seq.try_acquire(bucket); + auto token = op_seq.try_acquire(bucket, "foo"); EXPECT_TRUE(token.valid()); { RemoveBucketOperation op("storage", BucketAndNodes(bucket, toVector(0))); diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp index 852c2ef8754..90fcf40b3fe 100644 --- a/storage/src/tests/distributor/mergeoperationtest.cpp +++ b/storage/src/tests/distributor/mergeoperationtest.cpp @@ -422,7 +422,7 @@ TEST_F(MergeOperationTest, merge_operation_is_blocked_by_locked_bucket) { op.setIdealStateManager(&getIdealStateManager()); EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); - auto token = _operation_sequencer.try_acquire(makeDocumentBucket(document::BucketId(16, 1))); + auto token = _operation_sequencer.try_acquire(makeDocumentBucket(document::BucketId(16, 1)), "foo"); EXPECT_TRUE(token.valid()); EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer)); } diff --git a/storage/src/tests/distributor/operation_sequencer_test.cpp b/storage/src/tests/distributor/operation_sequencer_test.cpp index 7c9a22b5226..968d41dd98b 100644 --- a/storage/src/tests/distributor/operation_sequencer_test.cpp +++ b/storage/src/tests/distributor/operation_sequencer_test.cpp @@ -37,7 +37,8 @@ TEST_F(OperationSequencerTest, cannot_get_sequencing_handle_for_id_with_existing auto second_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test::abcd")); EXPECT_FALSE(second_handle.valid()); ASSERT_TRUE(second_handle.is_blocked()); - EXPECT_EQ(second_handle.blocked_by(), SequencingHandle::BlockedBy::PendingOperation); + EXPECT_TRUE(second_handle.is_blocked_by_pending_operation()); + EXPECT_FALSE(second_handle.is_blocked_by_bucket()); } TEST_F(OperationSequencerTest, can_get_sequencing_handle_for_different_ids) { @@ -63,17 +64,19 @@ TEST_F(OperationSequencerTest, releasing_handle_allows_for_getting_new_handles_f TEST_F(OperationSequencerTest, cannot_get_handle_for_gid_contained_in_locked_bucket) { const auto bucket = document::Bucket(default_space(), document::BucketId(16, 1)); EXPECT_FALSE(sequencer.is_blocked(bucket)); - auto bucket_handle = sequencer.try_acquire(bucket); + auto bucket_handle = sequencer.try_acquire(bucket, "foo"); EXPECT_TRUE(bucket_handle.valid()); EXPECT_TRUE(sequencer.is_blocked(bucket)); auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=1:abcd")); EXPECT_FALSE(doc_handle.valid()); ASSERT_TRUE(doc_handle.is_blocked()); - EXPECT_EQ(doc_handle.blocked_by(), SequencingHandle::BlockedBy::LockedBucket); + ASSERT_TRUE(doc_handle.is_blocked_by_bucket()); + EXPECT_TRUE(doc_handle.is_bucket_blocked_with_token("foo")); + EXPECT_FALSE(doc_handle.is_bucket_blocked_with_token("bar")); } TEST_F(OperationSequencerTest, can_get_handle_for_gid_not_contained_in_active_bucket) { - auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1))); + auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)), "foo"); EXPECT_TRUE(bucket_handle.valid()); // Note: different sub-bucket than the lock auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=2:abcd")); @@ -82,7 +85,7 @@ TEST_F(OperationSequencerTest, can_get_handle_for_gid_not_contained_in_active_bu TEST_F(OperationSequencerTest, releasing_bucket_lock_allows_gid_handles_to_be_acquired) { const auto bucket = document::Bucket(default_space(), document::BucketId(16, 1)); - auto bucket_handle = sequencer.try_acquire(bucket); + auto bucket_handle = sequencer.try_acquire(bucket, "foo"); bucket_handle.release(); auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=1:abcd")); EXPECT_TRUE(doc_handle.valid()); @@ -90,14 +93,14 @@ TEST_F(OperationSequencerTest, releasing_bucket_lock_allows_gid_handles_to_be_ac } TEST_F(OperationSequencerTest, can_get_handle_for_gid_when_locked_bucket_is_in_separate_bucket_space) { - auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1))); + auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)), "foo"); EXPECT_TRUE(bucket_handle.valid()); auto doc_handle = sequencer.try_acquire(global_space(), DocumentId("id:foo:test:n=1:abcd")); EXPECT_TRUE(doc_handle.valid()); } TEST_F(OperationSequencerTest, is_blocked_is_bucket_space_aware) { - auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1))); + auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)), "foo"); EXPECT_FALSE(sequencer.is_blocked(document::Bucket(global_space(), document::BucketId(16, 1)))); } diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp index 320c55f9998..33c3b747015 100644 --- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp +++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp @@ -4,11 +4,13 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include #include @@ -32,6 +34,15 @@ api::StorageMessageAddress make_storage_address(uint16_t node) { return {&_storage, lib::NodeType::STORAGE, node}; } +struct MockUuidGenerator : UuidGenerator { + vespalib::string _uuid; + MockUuidGenerator() : _uuid("a-very-random-id") {} + + vespalib::string generate_uuid() const override { + return _uuid; + } +}; + } struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { @@ -40,13 +51,15 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { std::unique_ptr _op_owner; BucketId _superbucket; BucketId _sub_bucket; + MockUuidGenerator _mock_uuid_generator; ReadForWriteVisitorOperationStarterTest() : _test_doc_man(), _default_config(100, 100), _op_owner(), _superbucket(16, 4), - _sub_bucket(17, 4) + _sub_bucket(17, 4), + _mock_uuid_generator() {} void SetUp() override { @@ -82,7 +95,8 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { std::shared_ptr create_rfw_op(std::shared_ptr visitor_op) { return std::make_shared( std::move(visitor_op), operation_sequencer(), - *_op_owner, getDistributor().getPendingMessageTracker()); + *_op_owner, getDistributor().getPendingMessageTracker(), + _mock_uuid_generator); } }; @@ -192,4 +206,16 @@ TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_bounced_if_bucket_remove _sender.getReplies(false, true)); } +TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_locks_bucket_with_random_token_with_parameter_propagation) { + _mock_uuid_generator._uuid = "fritjof"; + auto op = create_rfw_op(create_nested_visitor_op(true)); + _op_owner->start(op, OperationStarter::Priority(120)); + ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); + auto cmd = dynamic_pointer_cast(_sender.command(0)); + ASSERT_TRUE(cmd); + EXPECT_EQ(cmd->getParameters().get(reindexing_bucket_lock_visitor_parameter_key(), + vespalib::stringref("not found :I")), + "fritjof"); +} + } diff --git a/storage/src/tests/distributor/splitbuckettest.cpp b/storage/src/tests/distributor/splitbuckettest.cpp index 9bb2e8b04a7..a4a26fd28b9 100644 --- a/storage/src/tests/distributor/splitbuckettest.cpp +++ b/storage/src/tests/distributor/splitbuckettest.cpp @@ -313,7 +313,7 @@ TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) { maxSplitBits, splitCount, splitByteSize); EXPECT_FALSE(op.isBlocked(tracker, op_seq)); - auto token = op_seq.try_acquire(makeDocumentBucket(source_bucket)); + auto token = op_seq.try_acquire(makeDocumentBucket(source_bucket), "foo"); EXPECT_TRUE(token.valid()); EXPECT_TRUE(op.isBlocked(tracker, op_seq)); } diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp index f37ded7aacb..c4f72c312c7 100644 --- a/storage/src/tests/distributor/visitoroperationtest.cpp +++ b/storage/src/tests/distributor/visitoroperationtest.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -1088,4 +1089,21 @@ TEST_F(VisitorOperationTest, statistical_metrics_not_updated_on_wrong_distributi EXPECT_DOUBLE_EQ(0.0, defaultVisitorMetrics().latency.getCount()); } +TEST_F(VisitorOperationTest, assigning_put_lock_access_token_sets_special_visitor_parameter) { + document::BucketId id(0x400000000000007bULL); + enableDistributorClusterState("distributor:1 storage:1"); + addNodesToBucketDB(id, "0=1/1/1/t"); + + auto op = createOpWithDefaultConfig(createVisitorCommand("metricstats", id, nullId)); + op->assign_put_lock_access_token("its-a me, mario"); + + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); + auto cmd = std::dynamic_pointer_cast(_sender.command(0)); + ASSERT_TRUE(cmd); + EXPECT_EQ(cmd->getParameters().get(reindexing_bucket_lock_visitor_parameter_key(), + vespalib::stringref("")), + "its-a me, mario"); +} + } diff --git a/storage/src/vespa/storage/common/reindexing_constants.cpp b/storage/src/vespa/storage/common/reindexing_constants.cpp index c8330a43ffb..1c72f9a9d64 100644 --- a/storage/src/vespa/storage/common/reindexing_constants.cpp +++ b/storage/src/vespa/storage/common/reindexing_constants.cpp @@ -3,10 +3,14 @@ namespace storage { -const char* reindexing_bucket_lock_bypass_value() noexcept { +const char* reindexing_bucket_lock_bypass_prefix() noexcept { // This is by design a string that will fail to parse as a valid document selection in the backend. // It's only used to bypass the read-for-write visitor bucket lock. return "@@__vespa_internal_allow_through_bucket_lock"; } +const char* reindexing_bucket_lock_visitor_parameter_key() noexcept { + return "__vespa_internal_reindexing_bucket_token"; +} + } diff --git a/storage/src/vespa/storage/common/reindexing_constants.h b/storage/src/vespa/storage/common/reindexing_constants.h index be7d47b503c..7552ab332b1 100644 --- a/storage/src/vespa/storage/common/reindexing_constants.h +++ b/storage/src/vespa/storage/common/reindexing_constants.h @@ -3,6 +3,8 @@ namespace storage { -const char* reindexing_bucket_lock_bypass_value() noexcept; +const char* reindexing_bucket_lock_bypass_prefix() noexcept; + +const char* reindexing_bucket_lock_visitor_parameter_key() noexcept; } diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt index 7fc13046b3f..dd301f0c284 100644 --- a/storage/src/vespa/storage/distributor/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/CMakeLists.txt @@ -9,6 +9,7 @@ vespa_add_library(storage_distributor bucketlistmerger.cpp bucket_space_distribution_context.cpp clusterinformation.cpp + crypto_uuid_generator.cpp distributor_bucket_space.cpp distributor_bucket_space_repo.cpp distributor.cpp diff --git a/storage/src/vespa/storage/distributor/crypto_uuid_generator.cpp b/storage/src/vespa/storage/distributor/crypto_uuid_generator.cpp new file mode 100644 index 00000000000..3f9ab0b529d --- /dev/null +++ b/storage/src/vespa/storage/distributor/crypto_uuid_generator.cpp @@ -0,0 +1,20 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "crypto_uuid_generator.h" +#include + +namespace storage::distributor { + +vespalib::string CryptoUuidGenerator::generate_uuid() const { + unsigned char rand_buf[16]; + vespalib::crypto::random_buffer(rand_buf, sizeof(rand_buf)); + const char hex[16+1] = "0123456789abcdef"; + vespalib::string ret(sizeof(rand_buf) * 2, '\0'); + for (size_t i = 0; i < sizeof(rand_buf); ++i) { + ret[i*2 + 0] = hex[rand_buf[i] >> 4]; + ret[i*2 + 1] = hex[rand_buf[i] & 0x0f]; + } + return ret; +} + +} diff --git a/storage/src/vespa/storage/distributor/crypto_uuid_generator.h b/storage/src/vespa/storage/distributor/crypto_uuid_generator.h new file mode 100644 index 00000000000..40d2bd732b7 --- /dev/null +++ b/storage/src/vespa/storage/distributor/crypto_uuid_generator.h @@ -0,0 +1,18 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "uuid_generator.h" + +namespace storage::distributor { + +/** + * Generates a 128-bit unique identifier (represented as a hex string) from + * a cryptographically strong source of pseudo-randomness. + */ +class CryptoUuidGenerator : public UuidGenerator { +public: + ~CryptoUuidGenerator() override = default; + vespalib::string generate_uuid() const override; +}; + +} diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index aebea14b1f1..7c468457430 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bucket_space_distribution_context.h" +#include "crypto_uuid_generator.h" #include "externaloperationhandler.h" #include "distributor.h" #include "operation_sequencer.h" @@ -81,6 +82,7 @@ ExternalOperationHandler::ExternalOperationHandler(DistributorNodeContext& node_ _distributor_operation_owner(operation_owner), _non_main_thread_ops_mutex(), _non_main_thread_ops_owner(*_direct_dispatch_sender, _node_ctx.clock()), + _uuid_generator(std::make_unique()), _concurrent_gets_enabled(false), _use_weak_internal_read_consistency_for_gets(false) { @@ -262,9 +264,19 @@ void ExternalOperationHandler::bounce_or_invoke_read_only_op( namespace { -bool put_is_allowed_through_bucket_lock(const api::PutCommand& cmd) { +bool put_is_from_reindexing_visitor(const api::PutCommand& cmd) { const auto& tas_cond = cmd.getCondition(); - return (tas_cond.isPresent() && (tas_cond.getSelection() == reindexing_bucket_lock_bypass_value())); + return (tas_cond.isPresent() && (tas_cond.getSelection().starts_with(reindexing_bucket_lock_bypass_prefix()))); +} + +// Precondition: put_is_from_reindexing_visitor(cmd) == true +std::string extract_reindexing_token(const api::PutCommand& cmd) { + const std::string& tas_str = cmd.getCondition().getSelection(); + auto eq_idx = tas_str.find_first_of('='); + if (eq_idx != std::string::npos) { + return tas_str.substr(eq_idx + 1); + } + return ""; } } @@ -282,10 +294,17 @@ bool ExternalOperationHandler::onPut(const std::shared_ptr& cmd const auto bucket_space = cmd->getBucket().getBucketSpace(); auto handle = _operation_sequencer.try_acquire(bucket_space, cmd->getDocumentId()); bool allow = allowMutation(handle); - if (put_is_allowed_through_bucket_lock(*cmd)) { - if (!allow && handle.is_blocked_by(SequencingHandle::BlockedBy::LockedBucket)) { - cmd->setCondition(documentapi::TestAndSetCondition()); // Must clear TaS or the backend will reject the op - allow = true; + if (put_is_from_reindexing_visitor(*cmd)) { + auto expect_token = extract_reindexing_token(*cmd); + if (!allow && handle.is_blocked_by_bucket()) { + if (handle.is_bucket_blocked_with_token(expect_token)) { + cmd->setCondition(documentapi::TestAndSetCondition()); // Must clear TaS or the backend will reject the op + allow = true; + } else { + bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::REJECTED, + "Expected bucket lock token did not match actual lock token")); + return true; + } } else { bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::REJECTED, "Operation expects a read-for-write bucket lock to be present, " @@ -429,7 +448,8 @@ bool ExternalOperationHandler::onCreateVisitor(const std::shared_ptris_read_for_write()) { _op = std::make_shared(std::move(visit_op), _operation_sequencer, _distributor_operation_owner, - _op_ctx.pending_message_tracker()); + _op_ctx.pending_message_tracker(), + *_uuid_generator); } else { _op = std::move(visit_op); } diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h index 1a02ff92f58..9127325702a 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.h +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h @@ -25,6 +25,7 @@ class DirectDispatchSender; class SequencingHandle; class OperationSequencer; class OperationOwner; +class UuidGenerator; class ExternalOperationHandler : public api::MessageHandler { @@ -100,6 +101,7 @@ private: OperationOwner& _distributor_operation_owner; mutable std::mutex _non_main_thread_ops_mutex; OperationOwner _non_main_thread_ops_owner; + std::unique_ptr _uuid_generator; std::atomic _concurrent_gets_enabled; std::atomic _use_weak_internal_read_consistency_for_gets; diff --git a/storage/src/vespa/storage/distributor/operation_sequencer.cpp b/storage/src/vespa/storage/distributor/operation_sequencer.cpp index 2eeba79a053..234952c1e34 100644 --- a/storage/src/vespa/storage/distributor/operation_sequencer.cpp +++ b/storage/src/vespa/storage/distributor/operation_sequencer.cpp @@ -2,6 +2,7 @@ #include "operation_sequencer.h" #include +#include #include namespace storage::distributor { @@ -23,10 +24,10 @@ SequencingHandle OperationSequencer::try_acquire(document::BucketSpace bucket_sp // TODO avoid O(n), but sub bucket resolving is tricky and we expect the number // of locked buckets to be in the range of 0 to . for (const auto& entry : _active_buckets) { - if ((entry.getBucketSpace() == bucket_space) - && entry.getBucketId().contains(doc_bucket_id)) + if ((entry.first.getBucketSpace() == bucket_space) + && entry.first.getBucketId().contains(doc_bucket_id)) { - return SequencingHandle(SequencingHandle::BlockedBy::LockedBucket); + return SequencingHandle(SequencingHandle::BlockedByLockedBucket(entry.second)); } } } @@ -34,16 +35,17 @@ SequencingHandle OperationSequencer::try_acquire(document::BucketSpace bucket_sp if (inserted.second) { return SequencingHandle(*this, gid); } else { - return SequencingHandle(SequencingHandle::BlockedBy::PendingOperation); + return SequencingHandle(SequencingHandle::BlockedByPendingOperation()); } } -SequencingHandle OperationSequencer::try_acquire(const document::Bucket& bucket) { - const auto inserted = _active_buckets.insert(bucket); +SequencingHandle OperationSequencer::try_acquire(const document::Bucket& bucket, + const vespalib::string& token) { + const auto inserted = _active_buckets.insert(std::make_pair(bucket, token)); if (inserted.second) { return SequencingHandle(*this, bucket); } else { - return SequencingHandle(SequencingHandle::BlockedBy::LockedBucket); + return SequencingHandle(SequencingHandle::BlockedByLockedBucket(inserted.first->second)); } } @@ -57,8 +59,7 @@ void OperationSequencer::release(const SequencingHandle& handle) { _active_gids.erase(handle.gid()); } else { assert(handle.has_bucket()); - [[maybe_unused]] auto erased = _active_buckets.erase(handle.bucket()); - assert(erased == 1u); + _active_buckets.erase(handle.bucket()); } } diff --git a/storage/src/vespa/storage/distributor/operation_sequencer.h b/storage/src/vespa/storage/distributor/operation_sequencer.h index f9ff6b32e0b..72a6f547a0d 100644 --- a/storage/src/vespa/storage/distributor/operation_sequencer.h +++ b/storage/src/vespa/storage/distributor/operation_sequencer.h @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include @@ -26,25 +26,38 @@ class OperationSequencer; */ class SequencingHandle { public: - enum class BlockedBy { - PendingOperation, - LockedBucket + struct BlockedByPendingOperation {}; + struct BlockedByLockedBucket { + vespalib::string lock_token; + + BlockedByLockedBucket() = default; + explicit BlockedByLockedBucket(vespalib::stringref token) : lock_token(token) {} }; private: OperationSequencer* _sequencer; - using HandleVariant = std::variant; - HandleVariant _handle; + using HandleVariant = std::variant< + document::Bucket, + document::GlobalId, + BlockedByPendingOperation, + BlockedByLockedBucket + >; + HandleVariant _handle; public: SequencingHandle() noexcept : _sequencer(nullptr), _handle() {} - explicit SequencingHandle(BlockedBy blocked_by) + explicit SequencingHandle(BlockedByPendingOperation blocked_by) : _sequencer(nullptr), _handle(blocked_by) {} + explicit SequencingHandle(BlockedByLockedBucket blocked_by) + : _sequencer(nullptr), + _handle(std::move(blocked_by)) + {} + SequencingHandle(OperationSequencer& sequencer, const document::GlobalId& gid) noexcept : _sequencer(&sequencer), _handle(gid) @@ -81,13 +94,18 @@ public: [[nodiscard]] bool valid() const noexcept { return (_sequencer != nullptr); } [[nodiscard]] bool is_blocked() const noexcept { - return std::holds_alternative(_handle); + return (std::holds_alternative(_handle) || + std::holds_alternative(_handle)); + } + [[nodiscard]] bool is_blocked_by_pending_operation() const noexcept { + return std::holds_alternative(_handle); } - [[nodiscard]] BlockedBy blocked_by() const noexcept { - return std::get(_handle); // FIXME can actually throw + [[nodiscard]] bool is_blocked_by_bucket() const noexcept { + return std::holds_alternative(_handle); } - [[nodiscard]] bool is_blocked_by(BlockedBy blocker) const noexcept { - return (is_blocked() && blocked_by() == blocker); + [[nodiscard]] bool is_bucket_blocked_with_token(vespalib::stringref token) const noexcept { + return (std::holds_alternative(_handle) && + (std::get(_handle).lock_token == token)); } [[nodiscard]] bool has_bucket() const noexcept { return std::holds_alternative(_handle); @@ -113,11 +131,11 @@ public: * can be acquired for that ID until the original handle has been destroyed. */ class OperationSequencer { - using GidSet = vespalib::hash_set; - using BucketSet = std::set; + using GidSet = vespalib::hash_set; + using BucketLocks = vespalib::hash_map; - GidSet _active_gids; - BucketSet _active_buckets; + GidSet _active_gids; + BucketLocks _active_buckets; friend class SequencingHandle; public: @@ -129,7 +147,7 @@ public: // any bucket that may contain `id`. SequencingHandle try_acquire(document::BucketSpace bucket_space, const document::DocumentId& id); - SequencingHandle try_acquire(const document::Bucket& bucket); + SequencingHandle try_acquire(const document::Bucket& bucket, const vespalib::string& token); bool is_blocked(const document::Bucket&) const noexcept; private: diff --git a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp index 5ebf20138a4..04e64703c19 100644 --- a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -16,11 +17,13 @@ ReadForWriteVisitorOperationStarter::ReadForWriteVisitorOperationStarter( std::shared_ptr visitor_op, OperationSequencer& operation_sequencer, OperationOwner& stable_operation_owner, - PendingMessageTracker& message_tracker) + PendingMessageTracker& message_tracker, + UuidGenerator& uuid_generator) : _visitor_op(std::move(visitor_op)), _operation_sequencer(operation_sequencer), _stable_operation_owner(stable_operation_owner), - _message_tracker(message_tracker) + _message_tracker(message_tracker), + _uuid_generator(uuid_generator) { } @@ -46,14 +49,17 @@ void ReadForWriteVisitorOperationStarter::onStart(DistributorMessageSender& send _visitor_op->fail_with_merge_pending(sender); return; } - auto bucket_handle = _operation_sequencer.try_acquire(*maybe_bucket); + auto token = _uuid_generator.generate_uuid(); + auto bucket_handle = _operation_sequencer.try_acquire(*maybe_bucket, token); if (!bucket_handle.valid()) { LOG(debug, "An operation is already pending for bucket %s, failing visitor", maybe_bucket->toString().c_str()); _visitor_op->fail_with_bucket_already_locked(sender); return; } - LOG(debug, "Possibly deferring start of visitor for bucket %s", maybe_bucket->toString().c_str()); + _visitor_op->assign_put_lock_access_token(token); + LOG(debug, "Possibly deferring start of visitor for bucket %s, using lock token %s", + maybe_bucket->toString().c_str(), token.c_str()); _message_tracker.run_once_no_pending_for_bucket( *maybe_bucket, make_deferred_task([self = shared_from_this(), handle = std::move(bucket_handle)](TaskRunState state) mutable { diff --git a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h index a6b414e6fb5..e9391f9f133 100644 --- a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h +++ b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h @@ -10,6 +10,7 @@ namespace storage::distributor { class PendingMessageTracker; class VisitorOperation; class OperationOwner; +class UuidGenerator; /** * Operation starting indirection for a visitor operation that has the semantics @@ -31,11 +32,13 @@ class ReadForWriteVisitorOperationStarter OperationSequencer& _operation_sequencer; OperationOwner& _stable_operation_owner; PendingMessageTracker& _message_tracker; + UuidGenerator& _uuid_generator; public: ReadForWriteVisitorOperationStarter(std::shared_ptr visitor_op, OperationSequencer& operation_sequencer, OperationOwner& stable_operation_owner, - PendingMessageTracker& message_tracker); + PendingMessageTracker& message_tracker, + UuidGenerator& uuid_generator); ~ReadForWriteVisitorOperationStarter() override; const char* getName() const override { return "ReadForWriteVisitorOperationStarter"; } diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp index 5a8adb26cd8..606b3931464 100644 --- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "visitoroperation.h" +#include #include #include #include @@ -823,6 +824,10 @@ VisitorOperation::sendStorageVisitor(uint16_t node, cmd->setTimeout(timeLeft()); + if (!_put_lock_token.empty()) { + cmd->getParameters().set(reindexing_bucket_lock_visitor_parameter_key(), _put_lock_token); + } + LOG(spam, "Priority is %d", cmd->getPriority()); LOG(debug, "Sending CreateVisitor command %" PRIu64 " for storage visitor '%s' to %s", cmd->getMsgId(), @@ -909,4 +914,10 @@ VisitorOperation::assign_bucket_lock_handle(SequencingHandle handle) _bucket_lock = std::move(handle); } +void +VisitorOperation::assign_put_lock_access_token(const vespalib::string& token) +{ + _put_lock_token = token; +} + } diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h index e6ad7a042dd..179727a330c 100644 --- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h @@ -62,6 +62,7 @@ public: [[nodiscard]] bool is_read_for_write() const noexcept { return _is_read_for_write; } [[nodiscard]] std::optional first_bucket_to_visit() const; void assign_bucket_lock_handle(SequencingHandle handle); + void assign_put_lock_access_token(const vespalib::string& token); private: struct BucketInfo { @@ -179,6 +180,7 @@ private: mbus::TraceNode trace; framework::MilliSecTimer _operationTimer; SequencingHandle _bucket_lock; + vespalib::string _put_lock_token; bool _sentReply; bool _verified_and_expanded; bool _is_read_for_write; diff --git a/storage/src/vespa/storage/distributor/uuid_generator.h b/storage/src/vespa/storage/distributor/uuid_generator.h new file mode 100644 index 00000000000..ae133d83fd4 --- /dev/null +++ b/storage/src/vespa/storage/distributor/uuid_generator.h @@ -0,0 +1,19 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include + +namespace storage::distributor { + +/** + * Generator of universally unique identifiers (not actually conforming UUIDs, as these + * have MSB semantics) that are expected to be effectively collision free. + */ +class UuidGenerator { +public: + virtual ~UuidGenerator() = default; + // Returns a string that is guaranteed ASCII-only + virtual vespalib::string generate_uuid() const = 0; +}; + +} diff --git a/storage/src/vespa/storage/visiting/reindexing_visitor.cpp b/storage/src/vespa/storage/visiting/reindexing_visitor.cpp index 8d2b4736582..2d96876b641 100644 --- a/storage/src/vespa/storage/visiting/reindexing_visitor.cpp +++ b/storage/src/vespa/storage/visiting/reindexing_visitor.cpp @@ -18,6 +18,7 @@ void ReindexingVisitor::handleDocuments(const document::BucketId& /*bucketId*/, std::vector& entries, HitCounter& hitCounter) { + auto lock_token = make_lock_access_token(); LOG(debug, "Visitor %s handling block of %zu documents.", _id.c_str(), entries.size()); for (auto& entry : entries) { if (entry->isRemove()) { @@ -28,9 +29,20 @@ void ReindexingVisitor::handleDocuments(const document::BucketId& /*bucketId*/, hitCounter.addHit(*entry->getDocumentId(), doc_size); auto msg = std::make_unique(entry->releaseDocument()); msg->setApproxSize(doc_size); - msg->setCondition(documentapi::TestAndSetCondition(reindexing_bucket_lock_bypass_value())); + msg->setCondition(documentapi::TestAndSetCondition(lock_token)); sendMessage(std::move(msg)); } } +vespalib::string ReindexingVisitor::make_lock_access_token() const { + vespalib::string prefix = reindexing_bucket_lock_bypass_prefix(); + vespalib::stringref passed_token = visitor_parameters().get( + reindexing_bucket_lock_visitor_parameter_key(), + vespalib::stringref("")); + if (passed_token.empty()) { + return prefix; + } + return (prefix + "=" + passed_token); +} + } diff --git a/storage/src/vespa/storage/visiting/reindexing_visitor.h b/storage/src/vespa/storage/visiting/reindexing_visitor.h index 9be21f24f5f..815ef7e67fa 100644 --- a/storage/src/vespa/storage/visiting/reindexing_visitor.h +++ b/storage/src/vespa/storage/visiting/reindexing_visitor.h @@ -21,6 +21,7 @@ public: private: void handleDocuments(const document::BucketId&, std::vector&, HitCounter&) override; + vespalib::string make_lock_access_token() const; }; struct ReindexingVisitorFactory : public VisitorFactory { diff --git a/storage/src/vespa/storage/visiting/visitor.cpp b/storage/src/vespa/storage/visiting/visitor.cpp index 3618073da91..c846e0357b4 100644 --- a/storage/src/vespa/storage/visiting/visitor.cpp +++ b/storage/src/vespa/storage/visiting/visitor.cpp @@ -552,7 +552,7 @@ Visitor::start(api::VisitorId id, api::StorageMessage::Id cmdId, } void -Visitor::attach(std::shared_ptr initiatingCmd, +Visitor::attach(std::shared_ptr initiatingCmd, const mbus::Route& controlAddress, const mbus::Route& dataAddress, framework::MilliSecTime timeout) @@ -601,6 +601,12 @@ Visitor::addBoundedTrace(uint32_t level, const vespalib::string &message) { return _trace.add(std::move(tempTrace)); } +const vdslib::Parameters& +Visitor::visitor_parameters() const noexcept { + assert(_initiatingCmd); + return _initiatingCmd->getParameters(); +} + void Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& metrics) { diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h index 6f2b33629e6..6666308a0b9 100644 --- a/storage/src/vespa/storage/visiting/visitor.h +++ b/storage/src/vespa/storage/visiting/visitor.h @@ -307,7 +307,7 @@ private: // Used by visitor client to identify what visitor messages belong to api::StorageMessage::Id _visitorCmdId; api::VisitorId _visitorId; - std::shared_ptr _initiatingCmd; + std::shared_ptr _initiatingCmd; api::StorageMessage::Priority _priority; api::ReturnCode _result; @@ -348,6 +348,8 @@ protected: * Returns true iff message was added to internal trace tree. */ bool addBoundedTrace(uint32_t level, const vespalib::string& message); + + const vdslib::Parameters& visitor_parameters() const noexcept; public: Visitor(StorageComponent& component); virtual ~Visitor(); @@ -468,7 +470,7 @@ public: VisitorMessageSession::UP, documentapi::Priority::Value); - void attach(std::shared_ptr initiatingCmd, + void attach(std::shared_ptr initiatingCmd, const mbus::Route& controlAddress, const mbus::Route& dataAddress, framework::MilliSecTime timeout); -- cgit v1.2.3