summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-12-14 10:28:45 +0100
committerGitHub <noreply@github.com>2020-12-14 10:28:45 +0100
commita7eafc4a8b773785f944425bdf40ad9218a656a2 (patch)
tree4f4c5f3d8f92ba2d4c4a5e2490ab2c70801a72f3 /storage
parent6f479ad61a4a6d973ce6a985e25206e9adfdcfee (diff)
parentaa28f51fd49151f75934bf5e710c5c0ebae2ab8d (diff)
Merge pull request #15795 from vespa-engine/vekterli/use-random-bucket-lock-passthrough-token-for-reindexing-visitors
Only let reindexing puts through locked bucket if their TaS token matches that of the lock
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp24
-rw-r--r--storage/src/tests/distributor/idealstatemanagertest.cpp2
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp2
-rw-r--r--storage/src/tests/distributor/operation_sequencer_test.cpp17
-rw-r--r--storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp30
-rw-r--r--storage/src/tests/distributor/splitbuckettest.cpp2
-rw-r--r--storage/src/tests/distributor/visitoroperationtest.cpp18
-rw-r--r--storage/src/vespa/storage/common/reindexing_constants.cpp6
-rw-r--r--storage/src/vespa/storage/common/reindexing_constants.h4
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/crypto_uuid_generator.cpp20
-rw-r--r--storage/src/vespa/storage/distributor/crypto_uuid_generator.h18
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp34
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.h2
-rw-r--r--storage/src/vespa/storage/distributor/operation_sequencer.cpp19
-rw-r--r--storage/src/vespa/storage/distributor/operation_sequencer.h52
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp14
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h5
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp11
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/visitoroperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/uuid_generator.h19
-rw-r--r--storage/src/vespa/storage/visiting/reindexing_visitor.cpp14
-rw-r--r--storage/src/vespa/storage/visiting/reindexing_visitor.h1
-rw-r--r--storage/src/vespa/storage/visiting/visitor.cpp8
-rw-r--r--storage/src/vespa/storage/visiting/visitor.h6
25 files changed, 268 insertions, 63 deletions
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index b230781a90c..520e5c31cfb 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -569,15 +569,16 @@ struct OperationHandlerSequencingTest : ExternalOperationHandlerTest {
set_up_distributor_for_sequencing_test();
}
- static documentapi::TestAndSetCondition bucket_lock_bypass_tas_condition() {
- return documentapi::TestAndSetCondition(reindexing_bucket_lock_bypass_value());
+ static documentapi::TestAndSetCondition bucket_lock_bypass_tas_condition(const vespalib::string& token) {
+ return documentapi::TestAndSetCondition(
+ vespalib::make_string("%s=%s", reindexing_bucket_lock_bypass_prefix(), token.c_str()));
}
};
TEST_F(OperationHandlerSequencingTest, put_not_allowed_through_locked_bucket_if_special_tas_token_not_present) {
auto put = makePutCommand("testdoctype1", "id:foo:testdoctype1:n=1:bar");
auto bucket = makeDocumentBucket(document::BucketId(16, 1));
- auto bucket_handle = getExternalOperationHandler().operation_sequencer().try_acquire(bucket);
+ auto bucket_handle = getExternalOperationHandler().operation_sequencer().try_acquire(bucket, "foo");
ASSERT_TRUE(bucket_handle.valid());
ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(put));
}
@@ -586,19 +587,28 @@ TEST_F(OperationHandlerSequencingTest, put_allowed_through_locked_bucket_if_spec
set_up_distributor_for_sequencing_test();
auto put = makePutCommand("testdoctype1", "id:foo:testdoctype1:n=1:bar");
- put->setCondition(bucket_lock_bypass_tas_condition());
+ put->setCondition(bucket_lock_bypass_tas_condition("foo"));
auto bucket = makeDocumentBucket(document::BucketId(16, 1));
- auto bucket_handle = getExternalOperationHandler().operation_sequencer().try_acquire(bucket);
+ auto bucket_handle = getExternalOperationHandler().operation_sequencer().try_acquire(bucket, "foo");
ASSERT_TRUE(bucket_handle.valid());
Operation::SP op;
ASSERT_NO_FATAL_FAILURE(start_operation_verify_not_rejected(put, op));
}
+TEST_F(OperationHandlerSequencingTest, put_not_allowed_through_locked_bucket_if_tas_token_mismatches_current_lock_tkoen) {
+ auto put = makePutCommand("testdoctype1", "id:foo:testdoctype1:n=1:bar");
+ put->setCondition(bucket_lock_bypass_tas_condition("bar"));
+ auto bucket = makeDocumentBucket(document::BucketId(16, 1));
+ auto bucket_handle = getExternalOperationHandler().operation_sequencer().try_acquire(bucket, "foo");
+ ASSERT_TRUE(bucket_handle.valid());
+ ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(put));
+}
+
TEST_F(OperationHandlerSequencingTest, put_with_bucket_lock_tas_token_is_rejected_if_no_bucket_lock_present) {
auto put = makePutCommand("testdoctype1", "id:foo:testdoctype1:n=1:bar");
- put->setCondition(bucket_lock_bypass_tas_condition());
+ put->setCondition(bucket_lock_bypass_tas_condition("foo"));
ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(put));
// TODO determine most appropriate error code here. Want to fail the bucket but
// not the entire visitor operation. Will likely need to be revisited (heh!) soon.
@@ -611,7 +621,7 @@ TEST_F(OperationHandlerSequencingTest, put_with_bucket_lock_tas_token_is_rejecte
// present, this tests the case where a lock is present but it's not a bucket-level lock.
TEST_F(OperationHandlerSequencingTest, put_with_bucket_lock_tas_token_is_rejected_if_document_lock_present) {
auto put = makePutCommand("testdoctype1", _dummy_id);
- put->setCondition(bucket_lock_bypass_tas_condition());
+ put->setCondition(bucket_lock_bypass_tas_condition("foo"));
Operation::SP op;
ASSERT_NO_FATAL_FAILURE(start_operation_verify_not_rejected(makeUpdateCommand("testdoctype1", _dummy_id), op));
ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(std::move(put)));
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp
index a592ce37910..24470c09114 100644
--- a/storage/src/tests/distributor/idealstatemanagertest.cpp
+++ b/storage/src/tests/distributor/idealstatemanagertest.cpp
@@ -255,7 +255,7 @@ TEST_F(IdealStateManagerTest, block_operations_with_locked_buckets) {
msg->setAddress(api::StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
tracker.insert(msg);
}
- auto token = op_seq.try_acquire(bucket);
+ auto token = op_seq.try_acquire(bucket, "foo");
EXPECT_TRUE(token.valid());
{
RemoveBucketOperation op(&_Storage, BucketAndNodes(bucket, toVector<uint16_t>(0)));
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index 852c2ef8754..90fcf40b3fe 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -422,7 +422,7 @@ TEST_F(MergeOperationTest, merge_operation_is_blocked_by_locked_bucket) {
op.setIdealStateManager(&getIdealStateManager());
EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer));
- auto token = _operation_sequencer.try_acquire(makeDocumentBucket(document::BucketId(16, 1)));
+ auto token = _operation_sequencer.try_acquire(makeDocumentBucket(document::BucketId(16, 1)), "foo");
EXPECT_TRUE(token.valid());
EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer));
}
diff --git a/storage/src/tests/distributor/operation_sequencer_test.cpp b/storage/src/tests/distributor/operation_sequencer_test.cpp
index 7c9a22b5226..968d41dd98b 100644
--- a/storage/src/tests/distributor/operation_sequencer_test.cpp
+++ b/storage/src/tests/distributor/operation_sequencer_test.cpp
@@ -37,7 +37,8 @@ TEST_F(OperationSequencerTest, cannot_get_sequencing_handle_for_id_with_existing
auto second_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test::abcd"));
EXPECT_FALSE(second_handle.valid());
ASSERT_TRUE(second_handle.is_blocked());
- EXPECT_EQ(second_handle.blocked_by(), SequencingHandle::BlockedBy::PendingOperation);
+ EXPECT_TRUE(second_handle.is_blocked_by_pending_operation());
+ EXPECT_FALSE(second_handle.is_blocked_by_bucket());
}
TEST_F(OperationSequencerTest, can_get_sequencing_handle_for_different_ids) {
@@ -63,17 +64,19 @@ TEST_F(OperationSequencerTest, releasing_handle_allows_for_getting_new_handles_f
TEST_F(OperationSequencerTest, cannot_get_handle_for_gid_contained_in_locked_bucket) {
const auto bucket = document::Bucket(default_space(), document::BucketId(16, 1));
EXPECT_FALSE(sequencer.is_blocked(bucket));
- auto bucket_handle = sequencer.try_acquire(bucket);
+ auto bucket_handle = sequencer.try_acquire(bucket, "foo");
EXPECT_TRUE(bucket_handle.valid());
EXPECT_TRUE(sequencer.is_blocked(bucket));
auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=1:abcd"));
EXPECT_FALSE(doc_handle.valid());
ASSERT_TRUE(doc_handle.is_blocked());
- EXPECT_EQ(doc_handle.blocked_by(), SequencingHandle::BlockedBy::LockedBucket);
+ ASSERT_TRUE(doc_handle.is_blocked_by_bucket());
+ EXPECT_TRUE(doc_handle.is_bucket_blocked_with_token("foo"));
+ EXPECT_FALSE(doc_handle.is_bucket_blocked_with_token("bar"));
}
TEST_F(OperationSequencerTest, can_get_handle_for_gid_not_contained_in_active_bucket) {
- auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)));
+ auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)), "foo");
EXPECT_TRUE(bucket_handle.valid());
// Note: different sub-bucket than the lock
auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=2:abcd"));
@@ -82,7 +85,7 @@ TEST_F(OperationSequencerTest, can_get_handle_for_gid_not_contained_in_active_bu
TEST_F(OperationSequencerTest, releasing_bucket_lock_allows_gid_handles_to_be_acquired) {
const auto bucket = document::Bucket(default_space(), document::BucketId(16, 1));
- auto bucket_handle = sequencer.try_acquire(bucket);
+ auto bucket_handle = sequencer.try_acquire(bucket, "foo");
bucket_handle.release();
auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=1:abcd"));
EXPECT_TRUE(doc_handle.valid());
@@ -90,14 +93,14 @@ TEST_F(OperationSequencerTest, releasing_bucket_lock_allows_gid_handles_to_be_ac
}
TEST_F(OperationSequencerTest, can_get_handle_for_gid_when_locked_bucket_is_in_separate_bucket_space) {
- auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)));
+ auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)), "foo");
EXPECT_TRUE(bucket_handle.valid());
auto doc_handle = sequencer.try_acquire(global_space(), DocumentId("id:foo:test:n=1:abcd"));
EXPECT_TRUE(doc_handle.valid());
}
TEST_F(OperationSequencerTest, is_blocked_is_bucket_space_aware) {
- auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)));
+ auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)), "foo");
EXPECT_FALSE(sequencer.is_blocked(document::Bucket(global_space(), document::BucketId(16, 1))));
}
diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
index 320c55f9998..33c3b747015 100644
--- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
+++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
@@ -4,11 +4,13 @@
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/update/documentupdate.h>
+#include <vespa/storage/common/reindexing_constants.h>
#include <vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h>
#include <vespa/storage/distributor/operations/external/visitoroperation.h>
#include <vespa/storage/distributor/distributor.h>
#include <vespa/storage/distributor/distributormetricsset.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
+#include <vespa/storage/distributor/uuid_generator.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/visitor.h>
@@ -32,6 +34,15 @@ api::StorageMessageAddress make_storage_address(uint16_t node) {
return {&_storage, lib::NodeType::STORAGE, node};
}
+struct MockUuidGenerator : UuidGenerator {
+ vespalib::string _uuid;
+ MockUuidGenerator() : _uuid("a-very-random-id") {}
+
+ vespalib::string generate_uuid() const override {
+ return _uuid;
+ }
+};
+
}
struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil {
@@ -40,13 +51,15 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil {
std::unique_ptr<OperationOwner> _op_owner;
BucketId _superbucket;
BucketId _sub_bucket;
+ MockUuidGenerator _mock_uuid_generator;
ReadForWriteVisitorOperationStarterTest()
: _test_doc_man(),
_default_config(100, 100),
_op_owner(),
_superbucket(16, 4),
- _sub_bucket(17, 4)
+ _sub_bucket(17, 4),
+ _mock_uuid_generator()
{}
void SetUp() override {
@@ -82,7 +95,8 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil {
std::shared_ptr<ReadForWriteVisitorOperationStarter> create_rfw_op(std::shared_ptr<VisitorOperation> visitor_op) {
return std::make_shared<ReadForWriteVisitorOperationStarter>(
std::move(visitor_op), operation_sequencer(),
- *_op_owner, getDistributor().getPendingMessageTracker());
+ *_op_owner, getDistributor().getPendingMessageTracker(),
+ _mock_uuid_generator);
}
};
@@ -192,4 +206,16 @@ TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_bounced_if_bucket_remove
_sender.getReplies(false, true));
}
+TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_locks_bucket_with_random_token_with_parameter_propagation) {
+ _mock_uuid_generator._uuid = "fritjof";
+ auto op = create_rfw_op(create_nested_visitor_op(true));
+ _op_owner->start(op, OperationStarter::Priority(120));
+ ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
+ auto cmd = dynamic_pointer_cast<api::CreateVisitorCommand>(_sender.command(0));
+ ASSERT_TRUE(cmd);
+ EXPECT_EQ(cmd->getParameters().get(reindexing_bucket_lock_visitor_parameter_key(),
+ vespalib::stringref("not found :I")),
+ "fritjof");
+}
+
}
diff --git a/storage/src/tests/distributor/splitbuckettest.cpp b/storage/src/tests/distributor/splitbuckettest.cpp
index 9cb5fef600f..93351c0ab56 100644
--- a/storage/src/tests/distributor/splitbuckettest.cpp
+++ b/storage/src/tests/distributor/splitbuckettest.cpp
@@ -313,7 +313,7 @@ TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) {
maxSplitBits, splitCount, splitByteSize);
EXPECT_FALSE(op.isBlocked(tracker, op_seq));
- auto token = op_seq.try_acquire(makeDocumentBucket(source_bucket));
+ auto token = op_seq.try_acquire(makeDocumentBucket(source_bucket), "foo");
EXPECT_TRUE(token.valid());
EXPECT_TRUE(op.isBlocked(tracker, op_seq));
}
diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp
index f37ded7aacb..c4f72c312c7 100644
--- a/storage/src/tests/distributor/visitoroperationtest.cpp
+++ b/storage/src/tests/distributor/visitoroperationtest.cpp
@@ -4,6 +4,7 @@
#include <vespa/storageapi/message/datagram.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/state.h>
+#include <vespa/storage/common/reindexing_constants.h>
#include <vespa/storage/distributor/operations/external/visitoroperation.h>
#include <vespa/storage/distributor/operations/external/visitororder.h>
#include <vespa/storage/distributor/distributormetricsset.h>
@@ -1088,4 +1089,21 @@ TEST_F(VisitorOperationTest, statistical_metrics_not_updated_on_wrong_distributi
EXPECT_DOUBLE_EQ(0.0, defaultVisitorMetrics().latency.getCount());
}
+TEST_F(VisitorOperationTest, assigning_put_lock_access_token_sets_special_visitor_parameter) {
+ document::BucketId id(0x400000000000007bULL);
+ enableDistributorClusterState("distributor:1 storage:1");
+ addNodesToBucketDB(id, "0=1/1/1/t");
+
+ auto op = createOpWithDefaultConfig(createVisitorCommand("metricstats", id, nullId));
+ op->assign_put_lock_access_token("its-a me, mario");
+
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
+ auto cmd = std::dynamic_pointer_cast<api::CreateVisitorCommand>(_sender.command(0));
+ ASSERT_TRUE(cmd);
+ EXPECT_EQ(cmd->getParameters().get(reindexing_bucket_lock_visitor_parameter_key(),
+ vespalib::stringref("")),
+ "its-a me, mario");
+}
+
}
diff --git a/storage/src/vespa/storage/common/reindexing_constants.cpp b/storage/src/vespa/storage/common/reindexing_constants.cpp
index c8330a43ffb..1c72f9a9d64 100644
--- a/storage/src/vespa/storage/common/reindexing_constants.cpp
+++ b/storage/src/vespa/storage/common/reindexing_constants.cpp
@@ -3,10 +3,14 @@
namespace storage {
-const char* reindexing_bucket_lock_bypass_value() noexcept {
+const char* reindexing_bucket_lock_bypass_prefix() noexcept {
// This is by design a string that will fail to parse as a valid document selection in the backend.
// It's only used to bypass the read-for-write visitor bucket lock.
return "@@__vespa_internal_allow_through_bucket_lock";
}
+const char* reindexing_bucket_lock_visitor_parameter_key() noexcept {
+ return "__vespa_internal_reindexing_bucket_token";
+}
+
}
diff --git a/storage/src/vespa/storage/common/reindexing_constants.h b/storage/src/vespa/storage/common/reindexing_constants.h
index be7d47b503c..7552ab332b1 100644
--- a/storage/src/vespa/storage/common/reindexing_constants.h
+++ b/storage/src/vespa/storage/common/reindexing_constants.h
@@ -3,6 +3,8 @@
namespace storage {
-const char* reindexing_bucket_lock_bypass_value() noexcept;
+const char* reindexing_bucket_lock_bypass_prefix() noexcept;
+
+const char* reindexing_bucket_lock_visitor_parameter_key() noexcept;
}
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index 7fc13046b3f..dd301f0c284 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -9,6 +9,7 @@ vespa_add_library(storage_distributor
bucketlistmerger.cpp
bucket_space_distribution_context.cpp
clusterinformation.cpp
+ crypto_uuid_generator.cpp
distributor_bucket_space.cpp
distributor_bucket_space_repo.cpp
distributor.cpp
diff --git a/storage/src/vespa/storage/distributor/crypto_uuid_generator.cpp b/storage/src/vespa/storage/distributor/crypto_uuid_generator.cpp
new file mode 100644
index 00000000000..3f9ab0b529d
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/crypto_uuid_generator.cpp
@@ -0,0 +1,20 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "crypto_uuid_generator.h"
+#include <vespa/vespalib/crypto/random.h>
+
+namespace storage::distributor {
+
+vespalib::string CryptoUuidGenerator::generate_uuid() const {
+ unsigned char rand_buf[16];
+ vespalib::crypto::random_buffer(rand_buf, sizeof(rand_buf));
+ const char hex[16+1] = "0123456789abcdef";
+ vespalib::string ret(sizeof(rand_buf) * 2, '\0');
+ for (size_t i = 0; i < sizeof(rand_buf); ++i) {
+ ret[i*2 + 0] = hex[rand_buf[i] >> 4];
+ ret[i*2 + 1] = hex[rand_buf[i] & 0x0f];
+ }
+ return ret;
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/crypto_uuid_generator.h b/storage/src/vespa/storage/distributor/crypto_uuid_generator.h
new file mode 100644
index 00000000000..40d2bd732b7
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/crypto_uuid_generator.h
@@ -0,0 +1,18 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "uuid_generator.h"
+
+namespace storage::distributor {
+
+/**
+ * Generates a 128-bit unique identifier (represented as a hex string) from
+ * a cryptographically strong source of pseudo-randomness.
+ */
+class CryptoUuidGenerator : public UuidGenerator {
+public:
+ ~CryptoUuidGenerator() override = default;
+ vespalib::string generate_uuid() const override;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index aebea14b1f1..7c468457430 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "bucket_space_distribution_context.h"
+#include "crypto_uuid_generator.h"
#include "externaloperationhandler.h"
#include "distributor.h"
#include "operation_sequencer.h"
@@ -81,6 +82,7 @@ ExternalOperationHandler::ExternalOperationHandler(DistributorNodeContext& node_
_distributor_operation_owner(operation_owner),
_non_main_thread_ops_mutex(),
_non_main_thread_ops_owner(*_direct_dispatch_sender, _node_ctx.clock()),
+ _uuid_generator(std::make_unique<CryptoUuidGenerator>()),
_concurrent_gets_enabled(false),
_use_weak_internal_read_consistency_for_gets(false)
{
@@ -262,9 +264,19 @@ void ExternalOperationHandler::bounce_or_invoke_read_only_op(
namespace {
-bool put_is_allowed_through_bucket_lock(const api::PutCommand& cmd) {
+bool put_is_from_reindexing_visitor(const api::PutCommand& cmd) {
const auto& tas_cond = cmd.getCondition();
- return (tas_cond.isPresent() && (tas_cond.getSelection() == reindexing_bucket_lock_bypass_value()));
+ return (tas_cond.isPresent() && (tas_cond.getSelection().starts_with(reindexing_bucket_lock_bypass_prefix())));
+}
+
+// Precondition: put_is_from_reindexing_visitor(cmd) == true
+std::string extract_reindexing_token(const api::PutCommand& cmd) {
+ const std::string& tas_str = cmd.getCondition().getSelection();
+ auto eq_idx = tas_str.find_first_of('=');
+ if (eq_idx != std::string::npos) {
+ return tas_str.substr(eq_idx + 1);
+ }
+ return "";
}
}
@@ -282,10 +294,17 @@ bool ExternalOperationHandler::onPut(const std::shared_ptr<api::PutCommand>& cmd
const auto bucket_space = cmd->getBucket().getBucketSpace();
auto handle = _operation_sequencer.try_acquire(bucket_space, cmd->getDocumentId());
bool allow = allowMutation(handle);
- if (put_is_allowed_through_bucket_lock(*cmd)) {
- if (!allow && handle.is_blocked_by(SequencingHandle::BlockedBy::LockedBucket)) {
- cmd->setCondition(documentapi::TestAndSetCondition()); // Must clear TaS or the backend will reject the op
- allow = true;
+ if (put_is_from_reindexing_visitor(*cmd)) {
+ auto expect_token = extract_reindexing_token(*cmd);
+ if (!allow && handle.is_blocked_by_bucket()) {
+ if (handle.is_bucket_blocked_with_token(expect_token)) {
+ cmd->setCondition(documentapi::TestAndSetCondition()); // Must clear TaS or the backend will reject the op
+ allow = true;
+ } else {
+ bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::REJECTED,
+ "Expected bucket lock token did not match actual lock token"));
+ return true;
+ }
} else {
bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::REJECTED,
"Operation expects a read-for-write bucket lock to be present, "
@@ -429,7 +448,8 @@ bool ExternalOperationHandler::onCreateVisitor(const std::shared_ptr<api::Create
if (visit_op->is_read_for_write()) {
_op = std::make_shared<ReadForWriteVisitorOperationStarter>(std::move(visit_op), _operation_sequencer,
_distributor_operation_owner,
- _op_ctx.pending_message_tracker());
+ _op_ctx.pending_message_tracker(),
+ *_uuid_generator);
} else {
_op = std::move(visit_op);
}
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h
index 1a02ff92f58..9127325702a 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.h
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h
@@ -25,6 +25,7 @@ class DirectDispatchSender;
class SequencingHandle;
class OperationSequencer;
class OperationOwner;
+class UuidGenerator;
class ExternalOperationHandler : public api::MessageHandler
{
@@ -100,6 +101,7 @@ private:
OperationOwner& _distributor_operation_owner;
mutable std::mutex _non_main_thread_ops_mutex;
OperationOwner _non_main_thread_ops_owner;
+ std::unique_ptr<UuidGenerator> _uuid_generator;
std::atomic<bool> _concurrent_gets_enabled;
std::atomic<bool> _use_weak_internal_read_consistency_for_gets;
diff --git a/storage/src/vespa/storage/distributor/operation_sequencer.cpp b/storage/src/vespa/storage/distributor/operation_sequencer.cpp
index 2eeba79a053..234952c1e34 100644
--- a/storage/src/vespa/storage/distributor/operation_sequencer.cpp
+++ b/storage/src/vespa/storage/distributor/operation_sequencer.cpp
@@ -2,6 +2,7 @@
#include "operation_sequencer.h"
#include <vespa/document/base/documentid.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
#include <cassert>
namespace storage::distributor {
@@ -23,10 +24,10 @@ SequencingHandle OperationSequencer::try_acquire(document::BucketSpace bucket_sp
// TODO avoid O(n), but sub bucket resolving is tricky and we expect the number
// of locked buckets to be in the range of 0 to <very small number>.
for (const auto& entry : _active_buckets) {
- if ((entry.getBucketSpace() == bucket_space)
- && entry.getBucketId().contains(doc_bucket_id))
+ if ((entry.first.getBucketSpace() == bucket_space)
+ && entry.first.getBucketId().contains(doc_bucket_id))
{
- return SequencingHandle(SequencingHandle::BlockedBy::LockedBucket);
+ return SequencingHandle(SequencingHandle::BlockedByLockedBucket(entry.second));
}
}
}
@@ -34,16 +35,17 @@ SequencingHandle OperationSequencer::try_acquire(document::BucketSpace bucket_sp
if (inserted.second) {
return SequencingHandle(*this, gid);
} else {
- return SequencingHandle(SequencingHandle::BlockedBy::PendingOperation);
+ return SequencingHandle(SequencingHandle::BlockedByPendingOperation());
}
}
-SequencingHandle OperationSequencer::try_acquire(const document::Bucket& bucket) {
- const auto inserted = _active_buckets.insert(bucket);
+SequencingHandle OperationSequencer::try_acquire(const document::Bucket& bucket,
+ const vespalib::string& token) {
+ const auto inserted = _active_buckets.insert(std::make_pair(bucket, token));
if (inserted.second) {
return SequencingHandle(*this, bucket);
} else {
- return SequencingHandle(SequencingHandle::BlockedBy::LockedBucket);
+ return SequencingHandle(SequencingHandle::BlockedByLockedBucket(inserted.first->second));
}
}
@@ -57,8 +59,7 @@ void OperationSequencer::release(const SequencingHandle& handle) {
_active_gids.erase(handle.gid());
} else {
assert(handle.has_bucket());
- [[maybe_unused]] auto erased = _active_buckets.erase(handle.bucket());
- assert(erased == 1u);
+ _active_buckets.erase(handle.bucket());
}
}
diff --git a/storage/src/vespa/storage/distributor/operation_sequencer.h b/storage/src/vespa/storage/distributor/operation_sequencer.h
index f9ff6b32e0b..72a6f547a0d 100644
--- a/storage/src/vespa/storage/distributor/operation_sequencer.h
+++ b/storage/src/vespa/storage/distributor/operation_sequencer.h
@@ -4,7 +4,7 @@
#include <vespa/document/base/globalid.h>
#include <vespa/document/bucket/bucket.h>
#include <vespa/vespalib/stllike/hash_set.h>
-#include <set>
+#include <vespa/vespalib/stllike/hash_map.h>
#include <utility>
#include <variant>
@@ -26,25 +26,38 @@ class OperationSequencer;
*/
class SequencingHandle {
public:
- enum class BlockedBy {
- PendingOperation,
- LockedBucket
+ struct BlockedByPendingOperation {};
+ struct BlockedByLockedBucket {
+ vespalib::string lock_token;
+
+ BlockedByLockedBucket() = default;
+ explicit BlockedByLockedBucket(vespalib::stringref token) : lock_token(token) {}
};
private:
OperationSequencer* _sequencer;
- using HandleVariant = std::variant<document::Bucket, document::GlobalId, BlockedBy>;
- HandleVariant _handle;
+ using HandleVariant = std::variant<
+ document::Bucket,
+ document::GlobalId,
+ BlockedByPendingOperation,
+ BlockedByLockedBucket
+ >;
+ HandleVariant _handle;
public:
SequencingHandle() noexcept
: _sequencer(nullptr),
_handle()
{}
- explicit SequencingHandle(BlockedBy blocked_by)
+ explicit SequencingHandle(BlockedByPendingOperation blocked_by)
: _sequencer(nullptr),
_handle(blocked_by)
{}
+ explicit SequencingHandle(BlockedByLockedBucket blocked_by)
+ : _sequencer(nullptr),
+ _handle(std::move(blocked_by))
+ {}
+
SequencingHandle(OperationSequencer& sequencer, const document::GlobalId& gid) noexcept
: _sequencer(&sequencer),
_handle(gid)
@@ -81,13 +94,18 @@ public:
[[nodiscard]] bool valid() const noexcept { return (_sequencer != nullptr); }
[[nodiscard]] bool is_blocked() const noexcept {
- return std::holds_alternative<BlockedBy>(_handle);
+ return (std::holds_alternative<BlockedByPendingOperation>(_handle) ||
+ std::holds_alternative<BlockedByLockedBucket>(_handle));
+ }
+ [[nodiscard]] bool is_blocked_by_pending_operation() const noexcept {
+ return std::holds_alternative<BlockedByPendingOperation>(_handle);
}
- [[nodiscard]] BlockedBy blocked_by() const noexcept {
- return std::get<BlockedBy>(_handle); // FIXME can actually throw
+ [[nodiscard]] bool is_blocked_by_bucket() const noexcept {
+ return std::holds_alternative<BlockedByLockedBucket>(_handle);
}
- [[nodiscard]] bool is_blocked_by(BlockedBy blocker) const noexcept {
- return (is_blocked() && blocked_by() == blocker);
+ [[nodiscard]] bool is_bucket_blocked_with_token(vespalib::stringref token) const noexcept {
+ return (std::holds_alternative<BlockedByLockedBucket>(_handle) &&
+ (std::get<BlockedByLockedBucket>(_handle).lock_token == token));
}
[[nodiscard]] bool has_bucket() const noexcept {
return std::holds_alternative<document::Bucket>(_handle);
@@ -113,11 +131,11 @@ public:
* can be acquired for that ID until the original handle has been destroyed.
*/
class OperationSequencer {
- using GidSet = vespalib::hash_set<document::GlobalId, document::GlobalId::hash>;
- using BucketSet = std::set<document::Bucket>;
+ using GidSet = vespalib::hash_set<document::GlobalId, document::GlobalId::hash>;
+ using BucketLocks = vespalib::hash_map<document::Bucket, vespalib::string, document::Bucket::hash>;
- GidSet _active_gids;
- BucketSet _active_buckets;
+ GidSet _active_gids;
+ BucketLocks _active_buckets;
friend class SequencingHandle;
public:
@@ -129,7 +147,7 @@ public:
// any bucket that may contain `id`.
SequencingHandle try_acquire(document::BucketSpace bucket_space, const document::DocumentId& id);
- SequencingHandle try_acquire(const document::Bucket& bucket);
+ SequencingHandle try_acquire(const document::Bucket& bucket, const vespalib::string& token);
bool is_blocked(const document::Bucket&) const noexcept;
private:
diff --git a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp
index 5ebf20138a4..04e64703c19 100644
--- a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp
@@ -5,6 +5,7 @@
#include <vespa/storage/distributor/distributormessagesender.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
#include <vespa/storage/distributor/operationowner.h>
+#include <vespa/storage/distributor/uuid_generator.h>
#include <cassert>
#include <vespa/log/log.h>
@@ -16,11 +17,13 @@ ReadForWriteVisitorOperationStarter::ReadForWriteVisitorOperationStarter(
std::shared_ptr<VisitorOperation> visitor_op,
OperationSequencer& operation_sequencer,
OperationOwner& stable_operation_owner,
- PendingMessageTracker& message_tracker)
+ PendingMessageTracker& message_tracker,
+ UuidGenerator& uuid_generator)
: _visitor_op(std::move(visitor_op)),
_operation_sequencer(operation_sequencer),
_stable_operation_owner(stable_operation_owner),
- _message_tracker(message_tracker)
+ _message_tracker(message_tracker),
+ _uuid_generator(uuid_generator)
{
}
@@ -46,14 +49,17 @@ void ReadForWriteVisitorOperationStarter::onStart(DistributorMessageSender& send
_visitor_op->fail_with_merge_pending(sender);
return;
}
- auto bucket_handle = _operation_sequencer.try_acquire(*maybe_bucket);
+ auto token = _uuid_generator.generate_uuid();
+ auto bucket_handle = _operation_sequencer.try_acquire(*maybe_bucket, token);
if (!bucket_handle.valid()) {
LOG(debug, "An operation is already pending for bucket %s, failing visitor",
maybe_bucket->toString().c_str());
_visitor_op->fail_with_bucket_already_locked(sender);
return;
}
- LOG(debug, "Possibly deferring start of visitor for bucket %s", maybe_bucket->toString().c_str());
+ _visitor_op->assign_put_lock_access_token(token);
+ LOG(debug, "Possibly deferring start of visitor for bucket %s, using lock token %s",
+ maybe_bucket->toString().c_str(), token.c_str());
_message_tracker.run_once_no_pending_for_bucket(
*maybe_bucket,
make_deferred_task([self = shared_from_this(), handle = std::move(bucket_handle)](TaskRunState state) mutable {
diff --git a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h
index a6b414e6fb5..e9391f9f133 100644
--- a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h
@@ -10,6 +10,7 @@ namespace storage::distributor {
class PendingMessageTracker;
class VisitorOperation;
class OperationOwner;
+class UuidGenerator;
/**
* Operation starting indirection for a visitor operation that has the semantics
@@ -31,11 +32,13 @@ class ReadForWriteVisitorOperationStarter
OperationSequencer& _operation_sequencer;
OperationOwner& _stable_operation_owner;
PendingMessageTracker& _message_tracker;
+ UuidGenerator& _uuid_generator;
public:
ReadForWriteVisitorOperationStarter(std::shared_ptr<VisitorOperation> visitor_op,
OperationSequencer& operation_sequencer,
OperationOwner& stable_operation_owner,
- PendingMessageTracker& message_tracker);
+ PendingMessageTracker& message_tracker,
+ UuidGenerator& uuid_generator);
~ReadForWriteVisitorOperationStarter() override;
const char* getName() const override { return "ReadForWriteVisitorOperationStarter"; }
diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
index 5a8adb26cd8..606b3931464 100644
--- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "visitoroperation.h"
+#include <vespa/storage/common/reindexing_constants.h>
#include <vespa/storage/storageserver/storagemetricsset.h>
#include <vespa/storage/distributor/distributor.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
@@ -823,6 +824,10 @@ VisitorOperation::sendStorageVisitor(uint16_t node,
cmd->setTimeout(timeLeft());
+ if (!_put_lock_token.empty()) {
+ cmd->getParameters().set(reindexing_bucket_lock_visitor_parameter_key(), _put_lock_token);
+ }
+
LOG(spam, "Priority is %d", cmd->getPriority());
LOG(debug, "Sending CreateVisitor command %" PRIu64 " for storage visitor '%s' to %s",
cmd->getMsgId(),
@@ -909,4 +914,10 @@ VisitorOperation::assign_bucket_lock_handle(SequencingHandle handle)
_bucket_lock = std::move(handle);
}
+void
+VisitorOperation::assign_put_lock_access_token(const vespalib::string& token)
+{
+ _put_lock_token = token;
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
index e6ad7a042dd..179727a330c 100644
--- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
@@ -62,6 +62,7 @@ public:
[[nodiscard]] bool is_read_for_write() const noexcept { return _is_read_for_write; }
[[nodiscard]] std::optional<document::Bucket> first_bucket_to_visit() const;
void assign_bucket_lock_handle(SequencingHandle handle);
+ void assign_put_lock_access_token(const vespalib::string& token);
private:
struct BucketInfo {
@@ -179,6 +180,7 @@ private:
mbus::TraceNode trace;
framework::MilliSecTimer _operationTimer;
SequencingHandle _bucket_lock;
+ vespalib::string _put_lock_token;
bool _sentReply;
bool _verified_and_expanded;
bool _is_read_for_write;
diff --git a/storage/src/vespa/storage/distributor/uuid_generator.h b/storage/src/vespa/storage/distributor/uuid_generator.h
new file mode 100644
index 00000000000..ae133d83fd4
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/uuid_generator.h
@@ -0,0 +1,19 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+
+namespace storage::distributor {
+
+/**
+ * Generator of universally unique identifiers (not actually conforming UUIDs, as these
+ * have MSB semantics) that are expected to be effectively collision free.
+ */
+class UuidGenerator {
+public:
+ virtual ~UuidGenerator() = default;
+ // Returns a string that is guaranteed ASCII-only
+ virtual vespalib::string generate_uuid() const = 0;
+};
+
+}
diff --git a/storage/src/vespa/storage/visiting/reindexing_visitor.cpp b/storage/src/vespa/storage/visiting/reindexing_visitor.cpp
index 8d2b4736582..2d96876b641 100644
--- a/storage/src/vespa/storage/visiting/reindexing_visitor.cpp
+++ b/storage/src/vespa/storage/visiting/reindexing_visitor.cpp
@@ -18,6 +18,7 @@ void ReindexingVisitor::handleDocuments(const document::BucketId& /*bucketId*/,
std::vector<spi::DocEntry::UP>& entries,
HitCounter& hitCounter)
{
+ auto lock_token = make_lock_access_token();
LOG(debug, "Visitor %s handling block of %zu documents.", _id.c_str(), entries.size());
for (auto& entry : entries) {
if (entry->isRemove()) {
@@ -28,9 +29,20 @@ void ReindexingVisitor::handleDocuments(const document::BucketId& /*bucketId*/,
hitCounter.addHit(*entry->getDocumentId(), doc_size);
auto msg = std::make_unique<documentapi::PutDocumentMessage>(entry->releaseDocument());
msg->setApproxSize(doc_size);
- msg->setCondition(documentapi::TestAndSetCondition(reindexing_bucket_lock_bypass_value()));
+ msg->setCondition(documentapi::TestAndSetCondition(lock_token));
sendMessage(std::move(msg));
}
}
+vespalib::string ReindexingVisitor::make_lock_access_token() const {
+ vespalib::string prefix = reindexing_bucket_lock_bypass_prefix();
+ vespalib::stringref passed_token = visitor_parameters().get(
+ reindexing_bucket_lock_visitor_parameter_key(),
+ vespalib::stringref(""));
+ if (passed_token.empty()) {
+ return prefix;
+ }
+ return (prefix + "=" + passed_token);
+}
+
}
diff --git a/storage/src/vespa/storage/visiting/reindexing_visitor.h b/storage/src/vespa/storage/visiting/reindexing_visitor.h
index 9be21f24f5f..815ef7e67fa 100644
--- a/storage/src/vespa/storage/visiting/reindexing_visitor.h
+++ b/storage/src/vespa/storage/visiting/reindexing_visitor.h
@@ -21,6 +21,7 @@ public:
private:
void handleDocuments(const document::BucketId&, std::vector<spi::DocEntry::UP>&, HitCounter&) override;
+ vespalib::string make_lock_access_token() const;
};
struct ReindexingVisitorFactory : public VisitorFactory {
diff --git a/storage/src/vespa/storage/visiting/visitor.cpp b/storage/src/vespa/storage/visiting/visitor.cpp
index 3618073da91..c846e0357b4 100644
--- a/storage/src/vespa/storage/visiting/visitor.cpp
+++ b/storage/src/vespa/storage/visiting/visitor.cpp
@@ -552,7 +552,7 @@ Visitor::start(api::VisitorId id, api::StorageMessage::Id cmdId,
}
void
-Visitor::attach(std::shared_ptr<api::StorageCommand> initiatingCmd,
+Visitor::attach(std::shared_ptr<api::CreateVisitorCommand> initiatingCmd,
const mbus::Route& controlAddress,
const mbus::Route& dataAddress,
framework::MilliSecTime timeout)
@@ -601,6 +601,12 @@ Visitor::addBoundedTrace(uint32_t level, const vespalib::string &message) {
return _trace.add(std::move(tempTrace));
}
+const vdslib::Parameters&
+Visitor::visitor_parameters() const noexcept {
+ assert(_initiatingCmd);
+ return _initiatingCmd->getParameters();
+}
+
void
Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& metrics)
{
diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h
index 6f2b33629e6..6666308a0b9 100644
--- a/storage/src/vespa/storage/visiting/visitor.h
+++ b/storage/src/vespa/storage/visiting/visitor.h
@@ -307,7 +307,7 @@ private:
// Used by visitor client to identify what visitor messages belong to
api::StorageMessage::Id _visitorCmdId;
api::VisitorId _visitorId;
- std::shared_ptr<api::StorageCommand> _initiatingCmd;
+ std::shared_ptr<api::CreateVisitorCommand> _initiatingCmd;
api::StorageMessage::Priority _priority;
api::ReturnCode _result;
@@ -348,6 +348,8 @@ protected:
* Returns true iff message was added to internal trace tree.
*/
bool addBoundedTrace(uint32_t level, const vespalib::string& message);
+
+ const vdslib::Parameters& visitor_parameters() const noexcept;
public:
Visitor(StorageComponent& component);
virtual ~Visitor();
@@ -468,7 +470,7 @@ public:
VisitorMessageSession::UP,
documentapi::Priority::Value);
- void attach(std::shared_ptr<api::StorageCommand> initiatingCmd,
+ void attach(std::shared_ptr<api::CreateVisitorCommand> initiatingCmd,
const mbus::Route& controlAddress,
const mbus::Route& dataAddress,
framework::MilliSecTime timeout);