aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-10-26 16:58:33 +0200
committerTor Egge <Tor.Egge@online.no>2021-10-26 16:58:33 +0200
commit6b2976fd5e2244b41f1bf627d9621a695a41b1f8 (patch)
tree41429e3b729a722ee999ac23d3d052ce13a2ead7
parent2712b0b3f8a240f38c3cfd398c930314b7c81388 (diff)
Handover tracker to ApplyBucketDiffState on exceptions.
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp8
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp36
-rw-r--r--storage/src/tests/persistence/persistencetestutils.h36
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.h1
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp25
6 files changed, 112 insertions, 0 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index ed50730d79f..4bb906d4baf 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -56,6 +56,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils,
createDummyGetBucketDiff(int timestampOffset,
uint16_t hasMask);
+ MessageTracker::UP
+ createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) {
+ return createLockedTracker(cmd, bucket);
+ }
+
struct ExpectedExceptionSpec // Try saying this out loud 3 times in a row.
{
uint32_t mask;
@@ -308,6 +313,7 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
auto cmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes);
MessageTracker::UP tracker1 = handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket));
api::StorageMessage::SP replySent = std::move(*tracker1).stealReplySP();
+ tracker1.reset();
if (midChain) {
LOG(debug, "Check state");
@@ -1219,6 +1225,7 @@ TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) {
auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP());
ASSERT_TRUE(applyBucketDiffReply.get());
}
+ tracker.reset();
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -1326,6 +1333,7 @@ TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
cmd->setSourceIndex(1234);
MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
+ tracker.reset();
ASSERT_EQ(1u, messageKeeper()._msgs.size());
ASSERT_EQ(api::MessageType::GETBUCKETDIFF, messageKeeper()._msgs[0]->getType());
size_t baseline_diff_size = 0;
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index 4085288b45f..f3560bfa2cb 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -68,10 +68,46 @@ PersistenceTestEnvironment::~PersistenceTestEnvironment() {
}
}
+PersistenceTestUtils::MockBucketLocks::MockBucketLocks()
+ : _mutex(),
+ _cv(),
+ _locked_buckets()
+{
+}
+
+PersistenceTestUtils::MockBucketLocks::~MockBucketLocks()
+{
+ std::unique_lock<std::mutex> guard(_mutex);
+ while (!_locked_buckets.empty()) {
+ _cv.wait(guard);
+ }
+}
+
+void
+PersistenceTestUtils::MockBucketLocks::lock(document::Bucket bucket)
+{
+ std::unique_lock<std::mutex> guard(_mutex);
+ while (_locked_buckets.count(bucket) != 0) {
+ _cv.wait(guard);
+ }
+ _locked_buckets.insert(bucket);
+}
+
+void
+PersistenceTestUtils::MockBucketLocks::unlock(document::Bucket bucket)
+{
+ std::unique_lock<std::mutex> guard(_mutex);
+ auto itr = _locked_buckets.find(bucket);
+ assert(itr != _locked_buckets.end());
+ _locked_buckets.erase(itr);
+ _cv.notify_all();
+}
+
PersistenceTestUtils::PersistenceTestUtils()
: _env(std::make_unique<PersistenceTestEnvironment>("todo-make-unique-persistencetestutils")),
_replySender(),
_bucketOwnershipNotifier(getEnv()._component, getEnv()._fileStorHandler),
+ _mock_bucket_locks(),
_persistenceHandler()
{
setupExecutor(1);
diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h
index d7bf5b2f73f..de238b9eeb4 100644
--- a/storage/src/tests/persistence/persistencetestutils.h
+++ b/storage/src/tests/persistence/persistencetestutils.h
@@ -56,6 +56,35 @@ public:
private:
document::Bucket _bucket;
};
+ class MockBucketLocks {
+ std::mutex _mutex;
+ std::condition_variable _cv;
+ std::set<document::Bucket> _locked_buckets;
+ public:
+ MockBucketLocks();
+ ~MockBucketLocks();
+ void lock(document::Bucket bucket);
+ void unlock(document::Bucket bucket);
+ };
+
+ class MockBucketLock : public FileStorHandler::BucketLockInterface
+ {
+ public:
+ MockBucketLock(document::Bucket bucket, MockBucketLocks &locks) noexcept : _bucket(bucket), _locks(locks) { _locks.lock(bucket); }
+ ~MockBucketLock() { _locks.unlock(_bucket); }
+ const document::Bucket &getBucket() const override {
+ return _bucket;
+ }
+ api::LockingRequirements lockingRequirements() const noexcept override {
+ return api::LockingRequirements::Exclusive;
+ }
+ static std::shared_ptr<MockBucketLock> make(document::Bucket bucket, MockBucketLocks& locks) {
+ return std::make_shared<MockBucketLock>(bucket, locks);
+ }
+ private:
+ document::Bucket _bucket;
+ MockBucketLocks& _locks;
+ };
struct ReplySender : public MessageSender {
void sendCommand(const std::shared_ptr<api::StorageCommand> &) override {
@@ -73,6 +102,7 @@ public:
std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequenceTaskExecutor;
ReplySender _replySender;
BucketOwnershipNotifier _bucketOwnershipNotifier;
+ MockBucketLocks _mock_bucket_locks;
std::unique_ptr<PersistenceHandler> _persistenceHandler;
PersistenceTestUtils();
@@ -114,6 +144,12 @@ public:
_replySender, NoBucketLock::make(bucket), std::move(cmd));
}
+ MessageTracker::UP
+ createLockedTracker(api::StorageMessage::SP cmd, document::Bucket bucket) {
+ return MessageTracker::createForTesting(framework::MilliSecTimer(getEnv()._component.getClock()), getEnv(),
+ _replySender, MockBucketLock::make(bucket, _mock_bucket_locks), std::move(cmd));
+ }
+
api::ReturnCode
fetchResult(const MessageTracker::UP & tracker) {
if (tracker) {
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
index 556760b347e..97aba76dfac 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
@@ -109,6 +109,12 @@ ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracke
_delayed_reply = std::move(delayed_reply);
}
+void
+ApplyBucketDiffState::set_tracker(std::unique_ptr<MessageTracker>&& tracker)
+{
+ _tracker = std::move(tracker);
+}
+
std::shared_ptr<ApplyBucketDiffState>
ApplyBucketDiffState::create(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard)
{
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
index 7157c69191b..99fd5bbd1d0 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
@@ -48,6 +48,7 @@ public:
std::future<vespalib::string> get_future();
void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply);
void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply);
+ void set_tracker(std::unique_ptr<MessageTracker>&& tracker);
const spi::Bucket& get_bucket() const noexcept { return _bucket; }
};
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index c9ba43458b1..6d47a073977 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -846,6 +846,28 @@ public:
void deactivate() { _active = false; }
};
+class TrackerHandoverGuard {
+ std::shared_ptr<ApplyBucketDiffState>& _async_results;
+ std::unique_ptr<MessageTracker>& _tracker;
+public:
+ TrackerHandoverGuard(std::shared_ptr<ApplyBucketDiffState>& async_results,
+ std::unique_ptr<MessageTracker>& tracker)
+ : _async_results(async_results),
+ _tracker(tracker)
+ {
+ }
+ ~TrackerHandoverGuard() { handover(); }
+ void handover();
+};
+
+void
+TrackerHandoverGuard::handover()
+{
+ if (_async_results && _tracker) {
+ _async_results->set_tracker(std::move(_tracker));
+ }
+}
+
MessageTracker::UP
MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP tracker) const
{
@@ -1215,6 +1237,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
spi::Bucket bucket(cmd.getBucket());
std::shared_ptr<ApplyBucketDiffState> async_results;
+ TrackerHandoverGuard tracker_handover_guard(async_results, tracker);
LOG(debug, "%s", cmd.toString().c_str());
if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
@@ -1300,6 +1323,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
tracker->dontReply();
}
+ tracker_handover_guard.handover();
return tracker;
}
@@ -1309,6 +1333,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa
_env._metrics.applyBucketDiffReply.inc();
spi::Bucket bucket(reply.getBucket());
std::shared_ptr<ApplyBucketDiffState> async_results;
+ TrackerHandoverGuard tracker_handover_guard(async_results, tracker);
std::vector<api::ApplyBucketDiffCommand::Entry>& diff(reply.getDiff());
LOG(debug, "%s", reply.toString().c_str());