aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@yahooinc.com>2021-10-25 15:36:40 +0200
committerGitHub <noreply@github.com>2021-10-25 15:36:40 +0200
commit23f6c9faddfbffa44f9dffbd792e1342cec9d64b (patch)
tree2f0ed317091c1ea172710356a742e35d1155235b
parent01d7ef454dae04a41308759be44c6314acaa6f45 (diff)
parenta363f37e5c195cdebfeca09e98852ad00404bfb9 (diff)
Merge pull request #19717 from vespa-engine/toregge/delay-apply-bucket-diff-state-deletion-try-2
Delay deletion of ApplyBucketState.
-rw-r--r--storage/src/tests/persistence/apply_bucket_diff_state_test.cpp5
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp7
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp15
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.h5
-rw-r--r--storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h3
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp16
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h5
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp2
8 files changed, 49 insertions, 9 deletions
diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
index d51485df38d..701e8a80d3a 100644
--- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
+++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
@@ -43,6 +43,7 @@ public:
throw std::runtime_error(_fail);
}
}
+ void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override { }
void set_fail(vespalib::string fail) { _fail = std::move(fail); }
};
@@ -85,8 +86,8 @@ public:
~ApplyBucketDiffStateTestBase();
- std::unique_ptr<ApplyBucketDiffState> make_state() {
- return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count));
+ std::shared_ptr<ApplyBucketDiffState> make_state() {
+ return ApplyBucketDiffState::create(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count));
}
};
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 75e85fb4b6f..ed50730d79f 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -167,11 +167,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils,
MergeHandler createHandler(size_t maxChunkSize = 0x400000) {
return MergeHandler(getEnv(), getPersistenceProvider(),
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), maxChunkSize, 64, GetParam());
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64, GetParam());
}
MergeHandler createHandler(spi::PersistenceProvider & spi) {
return MergeHandler(getEnv(), spi,
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), 4190208, 64, GetParam());
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64, GetParam());
}
std::shared_ptr<api::StorageMessage> get_queued_reply() {
@@ -1438,6 +1438,9 @@ TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled fourth ApplyBucketDiffReply");
}
+ if (GetParam()) {
+ handler.drain_async_writes();
+ }
ASSERT_EQ(6u, messageKeeper()._msgs.size());
ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[5]->getType());
LOG(debug, "got mergebucket reply");
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
index ad153c41aef..556760b347e 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
@@ -12,6 +12,14 @@ using vespalib::RetainGuard;
namespace storage {
+class ApplyBucketDiffState::Deleter {
+public:
+ void operator()(ApplyBucketDiffState *raw_state) const noexcept {
+ std::unique_ptr<ApplyBucketDiffState> state(raw_state);
+ raw_state->_merge_bucket_info_syncer.schedule_delayed_delete(std::move(state));
+ }
+};
+
ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard)
: _merge_bucket_info_syncer(merge_bucket_info_syncer),
_bucket(bucket),
@@ -101,4 +109,11 @@ ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracke
_delayed_reply = std::move(delayed_reply);
}
+std::shared_ptr<ApplyBucketDiffState>
+ApplyBucketDiffState::create(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard)
+{
+ std::unique_ptr<ApplyBucketDiffState> state(new ApplyBucketDiffState(merge_bucket_info_syncer, bucket, std::move(retain_guard)));
+ return std::shared_ptr<ApplyBucketDiffState>(state.release(), Deleter());
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
index 39f60156e66..7157c69191b 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
@@ -24,6 +24,7 @@ class MergeBucketInfoSyncer;
* for one or more ApplyBucketDiffCommand / ApplyBucketDiffReply.
*/
class ApplyBucketDiffState {
+ class Deleter;
const MergeBucketInfoSyncer& _merge_bucket_info_syncer;
spi::Bucket _bucket;
vespalib::string _fail_message;
@@ -35,8 +36,9 @@ class ApplyBucketDiffState {
MessageSender* _sender;
vespalib::RetainGuard _retain_guard;
-public:
ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard);
+public:
+ static std::shared_ptr<ApplyBucketDiffState> create(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard);
~ApplyBucketDiffState();
void on_entry_complete(std::unique_ptr<storage::spi::Result> result, const document::DocumentId &doc_id, const char *op);
void wait();
@@ -46,6 +48,7 @@ public:
std::future<vespalib::string> get_future();
void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply);
void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply);
+ const spi::Bucket& get_bucket() const noexcept { return _bucket; }
};
}
diff --git a/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h
index e05991ad9e3..b3386c591e6 100644
--- a/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h
+++ b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h
@@ -6,6 +6,8 @@ namespace storage::spi { class Bucket; }
namespace storage {
+class ApplyBucketDiffState;
+
/*
* Interface class for syncing bucket info during merge.
*/
@@ -13,6 +15,7 @@ class MergeBucketInfoSyncer {
public:
virtual ~MergeBucketInfoSyncer() = default;
virtual void sync_bucket_info(const spi::Bucket& bucket) const = 0;
+ virtual void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const = 0;
};
}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 4cd0b181155..c9ba43458b1 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -11,6 +11,7 @@
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <algorithm>
#include <vespa/log/log.h>
@@ -23,6 +24,7 @@ namespace storage {
MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
+ vespalib::ISequencedTaskExecutor& executor,
uint32_t maxChunkSize,
uint32_t commonMergeChainOptimalizationMinimumSize,
bool async_apply_bucket_diff)
@@ -33,7 +35,8 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
_monitored_ref_count(std::make_unique<MonitoredRefCount>()),
_maxChunkSize(maxChunkSize),
_commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
- _async_apply_bucket_diff(async_apply_bucket_diff)
+ _async_apply_bucket_diff(async_apply_bucket_diff),
+ _executor(executor)
{
}
@@ -1233,7 +1236,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) {
framework::MilliSecTimer startTime(_clock);
- async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count));
+ async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count));
applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results);
if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) {
check_apply_diff_sync(std::move(async_results));
@@ -1341,7 +1344,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa
}
if (applyDiffHasLocallyNeededData(diff, index)) {
framework::MilliSecTimer startTime(_clock);
- async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count));
+ async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count));
applyDiffLocally(bucket, diff, index, s->context, async_results);
if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) {
check_apply_diff_sync(std::move(async_results));
@@ -1436,4 +1439,11 @@ MergeHandler::configure(bool async_apply_bucket_diff) noexcept
_async_apply_bucket_diff.store(async_apply_bucket_diff, std::memory_order_release);
}
+void
+MergeHandler::schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const
+{
+ auto bucket_id = state->get_bucket().getBucketId();
+ _executor.execute(bucket_id.getId(), [state = std::move(state)]() { });
+}
+
} // storage
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index e6b4d047209..4daec4c0689 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -22,6 +22,8 @@
#include <vespa/storage/common/messagesender.h>
#include <vespa/vespalib/util/monitored_refcount.h>
+namespace vespalib { class ISequencedTaskExecutor; }
+
namespace storage {
namespace spi {
@@ -45,6 +47,7 @@ public:
MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
+ vespalib::ISequencedTaskExecutor& executor,
uint32_t maxChunkSize = 4190208,
uint32_t commonMergeChainOptimalizationMinimumSize = 64,
bool async_apply_bucket_diff = false);
@@ -67,6 +70,7 @@ public:
spi::Context& context,
std::shared_ptr<ApplyBucketDiffState> async_results) const;
void sync_bucket_info(const spi::Bucket& bucket) const override;
+ void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override;
MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP) const;
MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const;
@@ -85,6 +89,7 @@ private:
const uint32_t _maxChunkSize;
const uint32_t _commonMergeChainOptimalizationMinimumSize;
std::atomic<bool> _async_apply_bucket_diff;
+ vespalib::ISequencedTaskExecutor& _executor;
MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const;
/** Returns a reply if merge is complete */
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 761021a9612..d03c9a6d111 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -17,7 +17,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen
: _clock(component.getClock()),
_env(component, filestorHandler, metrics, provider),
_processAllHandler(_env, provider),
- _mergeHandler(_env, provider, component.cluster_context(), _clock,
+ _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor,
cfg.bucketMergeChunkSize,
cfg.commonMergeChainOptimalizationMinimumSize,
cfg.asyncApplyBucketDiff),