aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-12-03 12:18:48 +0100
committerGitHub <noreply@github.com>2020-12-03 12:18:48 +0100
commit31457e8d51b51c62d29bbb000e6e0f2183c5d911 (patch)
treeab4e4c140e184bff3d479c89625faf7dc616c0bf /storage
parenta335a2b692ebaf1bb77182aeec45023c6b56d2a7 (diff)
parent21786e66e55f6c39dc7c7998b9575e8cbad2e806 (diff)
Merge pull request #15611 from vespa-engine/vekterli/allow-starting-deferred-tasks-concurrently-with-reads
Allow starting deferred tasks concurrently with reads
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/distributortestutil.h3
-rw-r--r--storage/src/tests/distributor/pendingmessagetrackertest.cpp38
-rw-r--r--storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp99
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.cpp50
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.h5
5 files changed, 153 insertions, 42 deletions
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index 3dc71bcb433..a6bd9d5d84c 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -175,6 +175,9 @@ public:
BucketDatabase::Entry getBucket(const document::BucketId& bId) const;
std::vector<document::BucketSpace> getBucketSpaces() const;
+
+ DistributorMessageSenderStub& sender() noexcept { return _sender; }
+ const DistributorMessageSenderStub& sender() const noexcept { return _sender; }
protected:
vdstestlib::DirConfig _config;
std::unique_ptr<TestDistributorApp> _node;
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
index 71b51c9a7b6..72365c61597 100644
--- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp
+++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
@@ -93,6 +93,16 @@ public:
return cmd;
}
+ std::shared_ptr<api::GetCommand> create_get_to_node(uint16_t node) const {
+ document::BucketId bucket(16, 1234);
+ auto cmd = std::make_shared<api::GetCommand>(
+ makeDocumentBucket(bucket),
+ document::DocumentId("id::testdoctype1:n=1234:foo"),
+ "[all]");
+ cmd->setAddress(makeStorageAddress(node));
+ return cmd;
+ }
+
PendingMessageTracker& tracker() { return *_tracker; }
auto& clock() { return _clock; }
@@ -444,7 +454,7 @@ document::BucketId bucket_of(const document::DocumentId& id) {
}
-TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_no_pending_ops) {
+TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_no_pending_write_ops) {
Fixture f;
auto cmd = f.createPutToNode(0);
auto bucket_id = bucket_of(cmd->getDocumentId());
@@ -455,6 +465,18 @@ TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_no_pending_
EXPECT_EQ(state, TaskRunState::OK);
}
+TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_only_pending_read_ops) {
+ Fixture f;
+ auto cmd = f.create_get_to_node(0);
+ f.tracker().insert(cmd);
+ auto bucket_id = bucket_of(cmd->getDocumentId());
+ auto state = TaskRunState::Aborted;
+ f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){
+ state = s;
+ }));
+ EXPECT_EQ(state, TaskRunState::OK);
+}
+
TEST_F(PendingMessageTrackerTest, deferred_task_not_started_before_pending_ops_completed) {
Fixture f;
auto cmd = f.sendPut(RequestBuilder().toNode(0));
@@ -468,6 +490,20 @@ TEST_F(PendingMessageTrackerTest, deferred_task_not_started_before_pending_ops_c
EXPECT_EQ(state, TaskRunState::OK);
}
+TEST_F(PendingMessageTrackerTest, deferred_task_can_be_started_with_pending_read_op) {
+ Fixture f;
+ auto cmd = f.sendPut(RequestBuilder().toNode(0));
+ auto bucket_id = bucket_of(cmd->getDocumentId());
+ auto state = TaskRunState::Aborted;
+ f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){
+ state = s;
+ }));
+ EXPECT_EQ(state, TaskRunState::Aborted);
+ f.tracker().insert(f.create_get_to_node(0)); // Concurrent Get and Put
+ f.sendPutReply(*cmd, RequestBuilder()); // Deferred task should be allowed to run
+ EXPECT_EQ(state, TaskRunState::OK);
+}
+
TEST_F(PendingMessageTrackerTest, abort_invokes_deferred_tasks_with_aborted_status) {
Fixture f;
auto cmd = f.sendPut(RequestBuilder().toNode(0));
diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
index 76112b1c729..daa2ca94bb3 100644
--- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
+++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
@@ -19,6 +19,14 @@ using document::BucketId;
namespace storage::distributor {
+namespace {
+
+Bucket default_bucket(BucketId id) {
+ return Bucket(document::FixedBucketSpaces::default_space(), id);
+}
+
+}
+
struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil {
document::TestDocMan _test_doc_man;
VisitorOperation::Config _default_config;
@@ -46,10 +54,6 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil {
close();
}
- static Bucket default_bucket(BucketId id) {
- return Bucket(document::FixedBucketSpaces::default_space(), id);
- }
-
std::shared_ptr<VisitorOperation> create_nested_visitor_op(bool valid_command = true) {
auto cmd = std::make_shared<api::CreateVisitorCommand>(
document::FixedBucketSpaces::default_space(), "reindexingvisitor", "foo", "");
@@ -88,37 +92,80 @@ TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_immediately_started_if_n
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
}
+namespace {
+
+struct ConcurrentMutationFixture {
+ ReadForWriteVisitorOperationStarterTest& _test;
+ std::shared_ptr<api::StorageCommand> _mutation;
+
+ explicit ConcurrentMutationFixture(ReadForWriteVisitorOperationStarterTest& test) : _test(test) {}
+
+ void block_bucket_with_mutation() {
+ // Pending mutating op to same bucket, prevents visitor from starting
+ auto update = std::make_shared<document::DocumentUpdate>(
+ _test._test_doc_man.getTypeRepo(),
+ *_test._test_doc_man.getTypeRepo().getDocumentType("testdoctype1"),
+ document::DocumentId("id::testdoctype1:n=4:foo"));
+ auto update_cmd = std::make_shared<api::UpdateCommand>(
+ default_bucket(document::BucketId(0)), std::move(update), api::Timestamp(0));
+
+ Operation::SP mutating_op;
+ _test.getExternalOperationHandler().handleMessage(update_cmd, mutating_op);
+ ASSERT_TRUE(mutating_op);
+ _test._op_owner->start(mutating_op, OperationStarter::Priority(120));
+ ASSERT_EQ("Update(BucketId(0x4400000000000004), id::testdoctype1:n=4:foo, timestamp 1) => 0",
+ _test.sender().getCommands(true, true));
+ _mutation = _test.sender().command(0);
+ // Since pending message tracking normally happens in the distributor itself during sendUp,
+ // we have to emulate this and explicitly insert the sent message into the pending mapping.
+ _test.getDistributor().getPendingMessageTracker().insert(_mutation);
+ }
+
+ void unblock_bucket() {
+ // Pretend update operation completed
+ auto update_reply = std::shared_ptr<api::StorageReply>(_mutation->makeReply());
+ _test.getDistributor().getPendingMessageTracker().reply(*update_reply);
+ _test._op_owner->handleReply(update_reply);
+ }
+};
+
+}
+
TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_start_deferred_if_pending_ops_to_bucket) {
+ ConcurrentMutationFixture f(*this);
auto op = create_rfw_op(create_nested_visitor_op(true));
- // Pending mutating op to same bucket, prevents visitor from starting
- auto update = std::make_shared<document::DocumentUpdate>(
- _test_doc_man.getTypeRepo(),
- *_test_doc_man.getTypeRepo().getDocumentType("testdoctype1"),
- document::DocumentId("id::testdoctype1:n=4:foo"));
- auto update_cmd = std::make_shared<api::UpdateCommand>(
- default_bucket(document::BucketId(0)), std::move(update), api::Timestamp(0));
-
- Operation::SP mutating_op;
- getExternalOperationHandler().handleMessage(update_cmd, mutating_op);
- ASSERT_TRUE(mutating_op);
- _op_owner->start(mutating_op, OperationStarter::Priority(120));
- ASSERT_EQ("Update(BucketId(0x4400000000000004), id::testdoctype1:n=4:foo, timestamp 1) => 0",
- _sender.getCommands(true, true));
- // Since pending message tracking normally happens in the distributor itself during sendUp,
- // we have to emulate this and explicitly insert the sent message into the pending mapping.
- getDistributor().getPendingMessageTracker().insert(_sender.command(0));
+ ASSERT_NO_FATAL_FAILURE(f.block_bucket_with_mutation());
_op_owner->start(op, OperationStarter::Priority(120));
// Nothing started yet
ASSERT_EQ("", _sender.getCommands(true, false, 1));
-
- // Pretend update operation completed
- auto update_reply = std::shared_ptr<api::StorageReply>(_sender.command(0)->makeReply());
- getDistributor().getPendingMessageTracker().reply(*update_reply);
- _op_owner->handleReply(update_reply);
+ ASSERT_NO_FATAL_FAILURE(f.unblock_bucket());
// Visitor should now be started!
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true, false, 1));
}
+TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_bounced_if_bucket_removed_from_db_before_deferred_start) {
+ ConcurrentMutationFixture f(*this);
+ auto op = create_rfw_op(create_nested_visitor_op(true));
+ ASSERT_NO_FATAL_FAILURE(f.block_bucket_with_mutation());
+
+ _op_owner->start(op, OperationStarter::Priority(120));
+ // Nothing started yet
+ ASSERT_EQ("", _sender.getCommands(true, false, 1));
+
+ // Simulate that ownership of bucket has changed, or replica has gone down.
+ removeFromBucketDB(_sub_bucket);
+ ASSERT_NO_FATAL_FAILURE(f.unblock_bucket());
+
+ // No visitor should be sent to the content node
+ ASSERT_EQ("", _sender.getCommands(true, false, 1));
+ // Instead, we should get a "bucket not found" transient error bounce back to the client.
+ EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) "
+ "ReturnCode(BUCKET_NOT_FOUND),"
+ "UpdateReply(id::testdoctype1:n=4:foo, BucketId(0x0000000000000000), "
+ "timestamp 1, timestamp of updated doc: 0) ReturnCode(NONE)",
+ _sender.getReplies(false, true));
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
index 8027b5349f9..44ab91528f2 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
@@ -121,7 +121,7 @@ PendingMessageTracker::reply(const api::StorageReply& r)
}
LOG(debug, "Erased message with id %" PRIu64, msgId);
msgs.erase(msgId);
- auto deferred_tasks = get_deferred_ops_if_bucket_pending_drained(bucket);
+ auto deferred_tasks = get_deferred_ops_if_bucket_writes_drained(bucket);
// Deferred tasks may try to send messages, which in turn will invoke the PendingMessageTracker.
// To avoid deadlocking, we run the tasks outside the lock.
// TODO remove locking entirely... Only exists for status pages!
@@ -143,23 +143,49 @@ bool is_empty_range(const Range& range) noexcept {
return (range.first == range.second);
}
+template <typename Range>
+bool range_is_empty_or_only_has_read_ops(const Range& range) noexcept {
+ if (is_empty_range(range)) {
+ return true;
+ }
+ // Number of ops to check is expected to be small in the common case
+ for (auto iter = range.first; iter != range.second; ++iter) {
+ switch (iter->msgType) {
+ case api::MessageType::GET_ID:
+ case api::MessageType::STAT_ID:
+ case api::MessageType::VISITOR_CREATE_ID:
+ case api::MessageType::VISITOR_DESTROY_ID:
+ continue;
+ default:
+ return false;
+ }
+ }
+ return true;
+}
+
+}
+
+bool
+PendingMessageTracker::bucket_has_no_pending_write_ops(const document::Bucket& bucket) const noexcept
+{
+ auto& bucket_idx = boost::multi_index::get<2>(_messages);
+ auto pending_tasks_for_bucket = bucket_idx.equal_range(bucket);
+ return range_is_empty_or_only_has_read_ops(pending_tasks_for_bucket);
}
std::vector<std::unique_ptr<DeferredTask>>
-PendingMessageTracker::get_deferred_ops_if_bucket_pending_drained(const document::Bucket& bucket)
+PendingMessageTracker::get_deferred_ops_if_bucket_writes_drained(const document::Bucket& bucket)
{
- if (_deferred_bucket_tasks.empty()) {
+ if (_deferred_read_tasks.empty()) {
return {};
}
std::vector<std::unique_ptr<DeferredTask>> tasks;
- auto& bucket_idx = boost::multi_index::get<2>(_messages);
- auto pending_tasks_for_bucket = bucket_idx.equal_range(bucket);
- if (is_empty_range(pending_tasks_for_bucket)) {
- auto waiting_tasks = _deferred_bucket_tasks.equal_range(bucket);
+ if (bucket_has_no_pending_write_ops(bucket)) {
+ auto waiting_tasks = _deferred_read_tasks.equal_range(bucket);
for (auto task_iter = waiting_tasks.first; task_iter != waiting_tasks.second; ++task_iter) {
tasks.emplace_back(std::move(task_iter->second));
}
- _deferred_bucket_tasks.erase(waiting_tasks.first, waiting_tasks.second);
+ _deferred_read_tasks.erase(waiting_tasks.first, waiting_tasks.second);
}
return tasks;
}
@@ -168,13 +194,11 @@ void
PendingMessageTracker::run_once_no_pending_for_bucket(const document::Bucket& bucket, std::unique_ptr<DeferredTask> task)
{
std::unique_lock guard(_lock);
- auto& bucket_idx = boost::multi_index::get<2>(_messages);
- const auto pending_tasks_for_bucket = bucket_idx.equal_range(bucket);
- if (is_empty_range(pending_tasks_for_bucket)) {
+ if (bucket_has_no_pending_write_ops(bucket)) {
guard.unlock(); // Must not be held whilst running task, or else recursive sends will deadlock.
task->run(TaskRunState::OK); // Nothing pending, run immediately.
} else {
- _deferred_bucket_tasks.emplace(bucket, std::move(task));
+ _deferred_read_tasks.emplace(bucket, std::move(task));
}
}
@@ -182,7 +206,7 @@ void
PendingMessageTracker::abort_deferred_tasks()
{
std::lock_guard guard(_lock);
- for (auto& task : _deferred_bucket_tasks) {
+ for (auto& task : _deferred_read_tasks) {
task.second->run(TaskRunState::Aborted);
}
}
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
index 39ea5c9c1a6..51c112152b6 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
@@ -189,7 +189,7 @@ private:
framework::Component _component;
NodeInfo _nodeInfo;
std::chrono::seconds _nodeBusyDuration;
- DeferredBucketTaskMap _deferred_bucket_tasks;
+ DeferredBucketTaskMap _deferred_read_tasks;
// Since distributor is currently single-threaded, this will only
// contend when status page is being accessed. It is, however, required
@@ -213,7 +213,8 @@ private:
void getStatusPerBucket(std::ostream& out) const;
TimePoint currentTime() const;
- std::vector<std::unique_ptr<DeferredTask>> get_deferred_ops_if_bucket_pending_drained(const document::Bucket&);
+ [[nodiscard]] bool bucket_has_no_pending_write_ops(const document::Bucket& bucket) const noexcept;
+ std::vector<std::unique_ptr<DeferredTask>> get_deferred_ops_if_bucket_writes_drained(const document::Bucket&);
};
}