diff options
author | Tor Egge <Tor.Egge@online.no> | 2021-10-25 12:45:54 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2021-10-25 12:45:54 +0200 |
commit | a363f37e5c195cdebfeca09e98852ad00404bfb9 (patch) | |
tree | e73f0bc69d9af2a85371c23ee3e52340e6098ebf | |
parent | 98b98eec39f56be04bd6ddc4c306302434525a41 (diff) |
Delay deletion of ApplyBucketState.
8 files changed, 49 insertions, 9 deletions
diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp index d51485df38d..701e8a80d3a 100644 --- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp +++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp @@ -43,6 +43,7 @@ public: throw std::runtime_error(_fail); } } + void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override { } void set_fail(vespalib::string fail) { _fail = std::move(fail); } }; @@ -85,8 +86,8 @@ public: ~ApplyBucketDiffStateTestBase(); - std::unique_ptr<ApplyBucketDiffState> make_state() { - return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count)); + std::shared_ptr<ApplyBucketDiffState> make_state() { + return ApplyBucketDiffState::create(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count)); } }; diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 017b8ce2b92..c43f286b609 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -167,11 +167,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils, MergeHandler createHandler(size_t maxChunkSize = 0x400000) { return MergeHandler(getEnv(), getPersistenceProvider(), - getEnv()._component.cluster_context(), getEnv()._component.getClock(), maxChunkSize, 64, GetParam()); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64, GetParam()); } MergeHandler createHandler(spi::PersistenceProvider & spi) { return MergeHandler(getEnv(), spi, - getEnv()._component.cluster_context(), getEnv()._component.getClock(), 4190208, 64, GetParam()); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64, GetParam()); } std::shared_ptr<api::StorageMessage> get_queued_reply() { @@ -1440,6 +1440,9 @@ TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled fourth ApplyBucketDiffReply"); } + if (GetParam()) { + handler.drain_async_writes(); + } ASSERT_EQ(6u, messageKeeper()._msgs.size()); ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[5]->getType()); LOG(debug, "got mergebucket reply"); diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp index ad153c41aef..556760b347e 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp @@ -12,6 +12,14 @@ using vespalib::RetainGuard; namespace storage { +class ApplyBucketDiffState::Deleter { +public: + void operator()(ApplyBucketDiffState *raw_state) const noexcept { + std::unique_ptr<ApplyBucketDiffState> state(raw_state); + raw_state->_merge_bucket_info_syncer.schedule_delayed_delete(std::move(state)); + } +}; + ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard) : _merge_bucket_info_syncer(merge_bucket_info_syncer), _bucket(bucket), @@ -101,4 +109,11 @@ ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracke _delayed_reply = std::move(delayed_reply); } +std::shared_ptr<ApplyBucketDiffState> +ApplyBucketDiffState::create(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard) +{ + std::unique_ptr<ApplyBucketDiffState> state(new ApplyBucketDiffState(merge_bucket_info_syncer, bucket, std::move(retain_guard))); + return std::shared_ptr<ApplyBucketDiffState>(state.release(), Deleter()); +} + } diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h index 39f60156e66..7157c69191b 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h @@ -24,6 +24,7 @@ class MergeBucketInfoSyncer; * for one or more ApplyBucketDiffCommand / ApplyBucketDiffReply. */ class ApplyBucketDiffState { + class Deleter; const MergeBucketInfoSyncer& _merge_bucket_info_syncer; spi::Bucket _bucket; vespalib::string _fail_message; @@ -35,8 +36,9 @@ class ApplyBucketDiffState { MessageSender* _sender; vespalib::RetainGuard _retain_guard; -public: ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard); +public: + static std::shared_ptr<ApplyBucketDiffState> create(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard); ~ApplyBucketDiffState(); void on_entry_complete(std::unique_ptr<storage::spi::Result> result, const document::DocumentId &doc_id, const char *op); void wait(); @@ -46,6 +48,7 @@ public: std::future<vespalib::string> get_future(); void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply); void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply); + const spi::Bucket& get_bucket() const noexcept { return _bucket; } }; } diff --git a/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h index e05991ad9e3..b3386c591e6 100644 --- a/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h +++ b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h @@ -6,6 +6,8 @@ namespace storage::spi { class Bucket; } namespace storage { +class ApplyBucketDiffState; + /* * Interface class for syncing bucket info during merge. */ @@ -13,6 +15,7 @@ class MergeBucketInfoSyncer { public: virtual ~MergeBucketInfoSyncer() = default; virtual void sync_bucket_info(const spi::Bucket& bucket) const = 0; + virtual void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const = 0; }; } diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 77e7762ec9a..9fe4b03558e 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -11,6 +11,7 @@ #include <vespa/document/fieldset/fieldsets.h> #include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <algorithm> #include <future> @@ -24,6 +25,7 @@ namespace storage { MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, + vespalib::ISequencedTaskExecutor& executor, uint32_t maxChunkSize, uint32_t commonMergeChainOptimalizationMinimumSize, bool async_apply_bucket_diff) @@ -34,7 +36,8 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, _monitored_ref_count(std::make_unique<MonitoredRefCount>()), _maxChunkSize(maxChunkSize), _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), - _async_apply_bucket_diff(async_apply_bucket_diff) + _async_apply_bucket_diff(async_apply_bucket_diff), + _executor(executor) { } @@ -1249,7 +1252,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) { framework::MilliSecTimer startTime(_clock); - async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count)); + async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count)); applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results); if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) { check_apply_diff_sync(std::move(async_results)); @@ -1357,7 +1360,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa } if (applyDiffHasLocallyNeededData(diff, index)) { framework::MilliSecTimer startTime(_clock); - async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count)); + async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count)); applyDiffLocally(bucket, diff, index, s->context, async_results); if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) { check_apply_diff_sync(std::move(async_results)); @@ -1452,4 +1455,11 @@ MergeHandler::configure(bool async_apply_bucket_diff) noexcept _async_apply_bucket_diff.store(async_apply_bucket_diff, std::memory_order_release); } +void +MergeHandler::schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const +{ + auto bucket_id = state->get_bucket().getBucketId(); + _executor.execute(bucket_id.getId(), [state = std::move(state)]() { }); +} + } // storage diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 17cfb847d2c..c98c50f2841 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -22,6 +22,8 @@ #include <vespa/storage/common/messagesender.h> #include <vespa/vespalib/util/monitored_refcount.h> +namespace vespalib { class ISequencedTaskExecutor; } + namespace storage { namespace spi { @@ -45,6 +47,7 @@ public: MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, + vespalib::ISequencedTaskExecutor& executor, uint32_t maxChunkSize = 4190208, uint32_t commonMergeChainOptimalizationMinimumSize = 64, bool async_apply_bucket_diff = false); @@ -67,6 +70,7 @@ public: spi::Context& context, std::shared_ptr<ApplyBucketDiffState> async_results) const; void sync_bucket_info(const spi::Bucket& bucket) const override; + void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override; MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP) const; MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const; @@ -85,6 +89,7 @@ private: const uint32_t _maxChunkSize; const uint32_t _commonMergeChainOptimalizationMinimumSize; std::atomic<bool> _async_apply_bucket_diff; + vespalib::ISequencedTaskExecutor& _executor; /** Returns a reply if merge is complete */ api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index aa1a9c136fd..393d36ca848 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -17,7 +17,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen : _clock(component.getClock()), _env(component, filestorHandler, metrics, provider), _processAllHandler(_env, provider), - _mergeHandler(_env, provider, component.cluster_context(), _clock, + _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor, cfg.bucketMergeChunkSize, cfg.commonMergeChainOptimalizationMinimumSize, cfg.asyncApplyBucketDiff), |