From 6b2976fd5e2244b41f1bf627d9621a695a41b1f8 Mon Sep 17 00:00:00 2001 From: Tor Egge Date: Tue, 26 Oct 2021 16:58:33 +0200 Subject: Handover tracker to ApplyBucketDiffState on exceptions. --- storage/src/tests/persistence/mergehandlertest.cpp | 8 +++++ .../src/tests/persistence/persistencetestutils.cpp | 36 ++++++++++++++++++++++ .../src/tests/persistence/persistencetestutils.h | 36 ++++++++++++++++++++++ .../persistence/apply_bucket_diff_state.cpp | 6 ++++ .../storage/persistence/apply_bucket_diff_state.h | 1 + .../src/vespa/storage/persistence/mergehandler.cpp | 25 +++++++++++++++ 6 files changed, 112 insertions(+) (limited to 'storage') diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index ed50730d79f..4bb906d4baf 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -56,6 +56,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils, createDummyGetBucketDiff(int timestampOffset, uint16_t hasMask); + MessageTracker::UP + createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) { + return createLockedTracker(cmd, bucket); + } + struct ExpectedExceptionSpec // Try saying this out loud 3 times in a row. { uint32_t mask; @@ -308,6 +313,7 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain) auto cmd = std::make_shared(_bucket, _nodes); MessageTracker::UP tracker1 = handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket)); api::StorageMessage::SP replySent = std::move(*tracker1).stealReplySP(); + tracker1.reset(); if (midChain) { LOG(debug, "Check state"); @@ -1219,6 +1225,7 @@ TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) { auto applyBucketDiffReply = std::dynamic_pointer_cast(std::move(*tracker).stealReplySP()); ASSERT_TRUE(applyBucketDiffReply.get()); } + tracker.reset(); auto cmd = std::make_shared(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); @@ -1326,6 +1333,7 @@ TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) auto cmd = std::make_shared(_bucket, _nodes, _maxTimestamp); cmd->setSourceIndex(1234); MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); + tracker.reset(); ASSERT_EQ(1u, messageKeeper()._msgs.size()); ASSERT_EQ(api::MessageType::GETBUCKETDIFF, messageKeeper()._msgs[0]->getType()); size_t baseline_diff_size = 0; diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index 4085288b45f..f3560bfa2cb 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -68,10 +68,46 @@ PersistenceTestEnvironment::~PersistenceTestEnvironment() { } } +PersistenceTestUtils::MockBucketLocks::MockBucketLocks() + : _mutex(), + _cv(), + _locked_buckets() +{ +} + +PersistenceTestUtils::MockBucketLocks::~MockBucketLocks() +{ + std::unique_lock guard(_mutex); + while (!_locked_buckets.empty()) { + _cv.wait(guard); + } +} + +void +PersistenceTestUtils::MockBucketLocks::lock(document::Bucket bucket) +{ + std::unique_lock guard(_mutex); + while (_locked_buckets.count(bucket) != 0) { + _cv.wait(guard); + } + _locked_buckets.insert(bucket); +} + +void +PersistenceTestUtils::MockBucketLocks::unlock(document::Bucket bucket) +{ + std::unique_lock guard(_mutex); + auto itr = _locked_buckets.find(bucket); + assert(itr != _locked_buckets.end()); + _locked_buckets.erase(itr); + _cv.notify_all(); +} + PersistenceTestUtils::PersistenceTestUtils() : _env(std::make_unique("todo-make-unique-persistencetestutils")), _replySender(), _bucketOwnershipNotifier(getEnv()._component, getEnv()._fileStorHandler), + _mock_bucket_locks(), _persistenceHandler() { setupExecutor(1); diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h index d7bf5b2f73f..de238b9eeb4 100644 --- a/storage/src/tests/persistence/persistencetestutils.h +++ b/storage/src/tests/persistence/persistencetestutils.h @@ -56,6 +56,35 @@ public: private: document::Bucket _bucket; }; + class MockBucketLocks { + std::mutex _mutex; + std::condition_variable _cv; + std::set _locked_buckets; + public: + MockBucketLocks(); + ~MockBucketLocks(); + void lock(document::Bucket bucket); + void unlock(document::Bucket bucket); + }; + + class MockBucketLock : public FileStorHandler::BucketLockInterface + { + public: + MockBucketLock(document::Bucket bucket, MockBucketLocks &locks) noexcept : _bucket(bucket), _locks(locks) { _locks.lock(bucket); } + ~MockBucketLock() { _locks.unlock(_bucket); } + const document::Bucket &getBucket() const override { + return _bucket; + } + api::LockingRequirements lockingRequirements() const noexcept override { + return api::LockingRequirements::Exclusive; + } + static std::shared_ptr make(document::Bucket bucket, MockBucketLocks& locks) { + return std::make_shared(bucket, locks); + } + private: + document::Bucket _bucket; + MockBucketLocks& _locks; + }; struct ReplySender : public MessageSender { void sendCommand(const std::shared_ptr &) override { @@ -73,6 +102,7 @@ public: std::unique_ptr _sequenceTaskExecutor; ReplySender _replySender; BucketOwnershipNotifier _bucketOwnershipNotifier; + MockBucketLocks _mock_bucket_locks; std::unique_ptr _persistenceHandler; PersistenceTestUtils(); @@ -114,6 +144,12 @@ public: _replySender, NoBucketLock::make(bucket), std::move(cmd)); } + MessageTracker::UP + createLockedTracker(api::StorageMessage::SP cmd, document::Bucket bucket) { + return MessageTracker::createForTesting(framework::MilliSecTimer(getEnv()._component.getClock()), getEnv(), + _replySender, MockBucketLock::make(bucket, _mock_bucket_locks), std::move(cmd)); + } + api::ReturnCode fetchResult(const MessageTracker::UP & tracker) { if (tracker) { diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp index 556760b347e..97aba76dfac 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp @@ -109,6 +109,12 @@ ApplyBucketDiffState::set_delayed_reply(std::unique_ptr&& tracke _delayed_reply = std::move(delayed_reply); } +void +ApplyBucketDiffState::set_tracker(std::unique_ptr&& tracker) +{ + _tracker = std::move(tracker); +} + std::shared_ptr ApplyBucketDiffState::create(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard) { diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h index 7157c69191b..99fd5bbd1d0 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h @@ -48,6 +48,7 @@ public: std::future get_future(); void set_delayed_reply(std::unique_ptr&& tracker, std::shared_ptr&& delayed_reply); void set_delayed_reply(std::unique_ptr&& tracker, MessageSender& sender, std::shared_ptr&& delayed_reply); + void set_tracker(std::unique_ptr&& tracker); const spi::Bucket& get_bucket() const noexcept { return _bucket; } }; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index c9ba43458b1..6d47a073977 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -846,6 +846,28 @@ public: void deactivate() { _active = false; } }; +class TrackerHandoverGuard { + std::shared_ptr& _async_results; + std::unique_ptr& _tracker; +public: + TrackerHandoverGuard(std::shared_ptr& async_results, + std::unique_ptr& tracker) + : _async_results(async_results), + _tracker(tracker) + { + } + ~TrackerHandoverGuard() { handover(); } + void handover(); +}; + +void +TrackerHandoverGuard::handover() +{ + if (_async_results && _tracker) { + _async_results->set_tracker(std::move(_tracker)); + } +} + MessageTracker::UP MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP tracker) const { @@ -1215,6 +1237,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra spi::Bucket bucket(cmd.getBucket()); std::shared_ptr async_results; + TrackerHandoverGuard tracker_handover_guard(async_results, tracker); LOG(debug, "%s", cmd.toString().c_str()); if (_env._fileStorHandler.isMerging(bucket.getBucket())) { @@ -1300,6 +1323,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra tracker->dontReply(); } + tracker_handover_guard.handover(); return tracker; } @@ -1309,6 +1333,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa _env._metrics.applyBucketDiffReply.inc(); spi::Bucket bucket(reply.getBucket()); std::shared_ptr async_results; + TrackerHandoverGuard tracker_handover_guard(async_results, tracker); std::vector& diff(reply.getDiff()); LOG(debug, "%s", reply.toString().c_str()); -- cgit v1.2.3