diff options
author | Geir Storli <geirst@yahooinc.com> | 2021-10-25 15:36:40 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-25 15:36:40 +0200 |
commit | 23f6c9faddfbffa44f9dffbd792e1342cec9d64b (patch) | |
tree | 2f0ed317091c1ea172710356a742e35d1155235b | |
parent | 01d7ef454dae04a41308759be44c6314acaa6f45 (diff) | |
parent | a363f37e5c195cdebfeca09e98852ad00404bfb9 (diff) |
Merge pull request #19717 from vespa-engine/toregge/delay-apply-bucket-diff-state-deletion-try-2
Delay deletion of ApplyBucketState.
8 files changed, 49 insertions, 9 deletions
diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp index d51485df38d..701e8a80d3a 100644 --- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp +++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp @@ -43,6 +43,7 @@ public: throw std::runtime_error(_fail); } } + void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override { } void set_fail(vespalib::string fail) { _fail = std::move(fail); } }; @@ -85,8 +86,8 @@ public: ~ApplyBucketDiffStateTestBase(); - std::unique_ptr<ApplyBucketDiffState> make_state() { - return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count)); + std::shared_ptr<ApplyBucketDiffState> make_state() { + return ApplyBucketDiffState::create(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count)); } }; diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 75e85fb4b6f..ed50730d79f 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -167,11 +167,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils, MergeHandler createHandler(size_t maxChunkSize = 0x400000) { return MergeHandler(getEnv(), getPersistenceProvider(), - getEnv()._component.cluster_context(), getEnv()._component.getClock(), maxChunkSize, 64, GetParam()); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64, GetParam()); } MergeHandler createHandler(spi::PersistenceProvider & spi) { return MergeHandler(getEnv(), spi, - getEnv()._component.cluster_context(), getEnv()._component.getClock(), 4190208, 64, GetParam()); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64, GetParam()); } std::shared_ptr<api::StorageMessage> get_queued_reply() { @@ -1438,6 +1438,9 @@ TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled fourth ApplyBucketDiffReply"); } + if (GetParam()) { + handler.drain_async_writes(); + } ASSERT_EQ(6u, messageKeeper()._msgs.size()); ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[5]->getType()); LOG(debug, "got mergebucket reply"); diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp index ad153c41aef..556760b347e 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp @@ -12,6 +12,14 @@ using vespalib::RetainGuard; namespace storage { +class ApplyBucketDiffState::Deleter { +public: + void operator()(ApplyBucketDiffState *raw_state) const noexcept { + std::unique_ptr<ApplyBucketDiffState> state(raw_state); + raw_state->_merge_bucket_info_syncer.schedule_delayed_delete(std::move(state)); + } +}; + ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard) : _merge_bucket_info_syncer(merge_bucket_info_syncer), _bucket(bucket), @@ -101,4 +109,11 @@ ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracke _delayed_reply = std::move(delayed_reply); } +std::shared_ptr<ApplyBucketDiffState> +ApplyBucketDiffState::create(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard) +{ + std::unique_ptr<ApplyBucketDiffState> state(new ApplyBucketDiffState(merge_bucket_info_syncer, bucket, std::move(retain_guard))); + return std::shared_ptr<ApplyBucketDiffState>(state.release(), Deleter()); +} + } diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h index 39f60156e66..7157c69191b 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h @@ -24,6 +24,7 @@ class MergeBucketInfoSyncer; * for one or more ApplyBucketDiffCommand / ApplyBucketDiffReply. */ class ApplyBucketDiffState { + class Deleter; const MergeBucketInfoSyncer& _merge_bucket_info_syncer; spi::Bucket _bucket; vespalib::string _fail_message; @@ -35,8 +36,9 @@ class ApplyBucketDiffState { MessageSender* _sender; vespalib::RetainGuard _retain_guard; -public: ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard); +public: + static std::shared_ptr<ApplyBucketDiffState> create(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard); ~ApplyBucketDiffState(); void on_entry_complete(std::unique_ptr<storage::spi::Result> result, const document::DocumentId &doc_id, const char *op); void wait(); @@ -46,6 +48,7 @@ public: std::future<vespalib::string> get_future(); void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply); void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply); + const spi::Bucket& get_bucket() const noexcept { return _bucket; } }; } diff --git a/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h index e05991ad9e3..b3386c591e6 100644 --- a/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h +++ b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h @@ -6,6 +6,8 @@ namespace storage::spi { class Bucket; } namespace storage { +class ApplyBucketDiffState; + /* * Interface class for syncing bucket info during merge. */ @@ -13,6 +15,7 @@ class MergeBucketInfoSyncer { public: virtual ~MergeBucketInfoSyncer() = default; virtual void sync_bucket_info(const spi::Bucket& bucket) const = 0; + virtual void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const = 0; }; } diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 4cd0b181155..c9ba43458b1 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -11,6 +11,7 @@ #include <vespa/document/fieldset/fieldsets.h> #include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <algorithm> #include <vespa/log/log.h> @@ -23,6 +24,7 @@ namespace storage { MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, + vespalib::ISequencedTaskExecutor& executor, uint32_t maxChunkSize, uint32_t commonMergeChainOptimalizationMinimumSize, bool async_apply_bucket_diff) @@ -33,7 +35,8 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, _monitored_ref_count(std::make_unique<MonitoredRefCount>()), _maxChunkSize(maxChunkSize), _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), - _async_apply_bucket_diff(async_apply_bucket_diff) + _async_apply_bucket_diff(async_apply_bucket_diff), + _executor(executor) { } @@ -1233,7 +1236,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) { framework::MilliSecTimer startTime(_clock); - async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count)); + async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count)); applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results); if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) { check_apply_diff_sync(std::move(async_results)); @@ -1341,7 +1344,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa } if (applyDiffHasLocallyNeededData(diff, index)) { framework::MilliSecTimer startTime(_clock); - async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count)); + async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count)); applyDiffLocally(bucket, diff, index, s->context, async_results); if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) { check_apply_diff_sync(std::move(async_results)); @@ -1436,4 +1439,11 @@ MergeHandler::configure(bool async_apply_bucket_diff) noexcept _async_apply_bucket_diff.store(async_apply_bucket_diff, std::memory_order_release); } +void +MergeHandler::schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const +{ + auto bucket_id = state->get_bucket().getBucketId(); + _executor.execute(bucket_id.getId(), [state = std::move(state)]() { }); +} + } // storage diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index e6b4d047209..4daec4c0689 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -22,6 +22,8 @@ #include <vespa/storage/common/messagesender.h> #include <vespa/vespalib/util/monitored_refcount.h> +namespace vespalib { class ISequencedTaskExecutor; } + namespace storage { namespace spi { @@ -45,6 +47,7 @@ public: MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, + vespalib::ISequencedTaskExecutor& executor, uint32_t maxChunkSize = 4190208, uint32_t commonMergeChainOptimalizationMinimumSize = 64, bool async_apply_bucket_diff = false); @@ -67,6 +70,7 @@ public: spi::Context& context, std::shared_ptr<ApplyBucketDiffState> async_results) const; void sync_bucket_info(const spi::Bucket& bucket) const override; + void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override; MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP) const; MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const; @@ -85,6 +89,7 @@ private: const uint32_t _maxChunkSize; const uint32_t _commonMergeChainOptimalizationMinimumSize; std::atomic<bool> _async_apply_bucket_diff; + vespalib::ISequencedTaskExecutor& _executor; MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const; /** Returns a reply if merge is complete */ diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 761021a9612..d03c9a6d111 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -17,7 +17,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen : _clock(component.getClock()), _env(component, filestorHandler, metrics, provider), _processAllHandler(_env, provider), - _mergeHandler(_env, provider, component.cluster_context(), _clock, + _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor, cfg.bucketMergeChunkSize, cfg.commonMergeChainOptimalizationMinimumSize, cfg.asyncApplyBucketDiff), |