diff options
12 files changed, 41 insertions, 51 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h index b832cb6c02c..2479787eb31 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h @@ -18,7 +18,7 @@ public: DummyBucketExecutor(size_t numExecutors); ~DummyBucketExecutor() override; std::unique_ptr<BucketTask> execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) override; - void sync() override; + void sync(); private: std::unique_ptr<vespalib::SyncableThreadExecutor> _executor; std::mutex _lock; diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 0865500d3c0..305f1c81192 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -866,14 +866,10 @@ DummyPersistence::register_resource_usage_listener(IResourceUsageListener &liste namespace { -class SyncExecutorOnDestruction : public vespalib::IDestructorCallback { +class ExecutorRegistration : public vespalib::IDestructorCallback { public: - explicit SyncExecutorOnDestruction(std::shared_ptr<BucketExecutor> executor) : _executor(std::move(executor)) { } - ~SyncExecutorOnDestruction() override { - if (_executor) { - _executor->sync(); - } - } + explicit ExecutorRegistration(std::shared_ptr<BucketExecutor> executor) : _executor(std::move(executor)) { } + ~ExecutorRegistration() override = default; private: std::shared_ptr<BucketExecutor> _executor; }; @@ -885,7 +881,7 @@ DummyPersistence::register_executor(std::shared_ptr<BucketExecutor> executor) { assert(_bucket_executor.expired()); _bucket_executor = executor; - return std::make_unique<SyncExecutorOnDestruction>(executor); + return std::make_unique<ExecutorRegistration>(executor); } std::string diff --git a/persistence/src/vespa/persistence/spi/bucketexecutor.h b/persistence/src/vespa/persistence/spi/bucketexecutor.h index 8237b78cca0..d1ada30959e 100644 --- a/persistence/src/vespa/persistence/spi/bucketexecutor.h +++ b/persistence/src/vespa/persistence/spi/bucketexecutor.h @@ -29,7 +29,6 @@ public: struct BucketExecutor { virtual ~BucketExecutor() = default; virtual std::unique_ptr<BucketTask> execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) = 0; - virtual void sync() = 0; }; } diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h index fd1cf7fb5a8..dd2760022c2 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h @@ -5,8 +5,9 @@ #include <vespa/persistence/spi/bucketexecutor.h> #include <vespa/vespalib/gtest/gtest.h> +namespace storage::spi::dummy { class DummyBucketExecutor; } struct JobTestBase : public ::testing::TestWithParam<bool> { - std::unique_ptr<storage::spi::BucketExecutor> _bucketExecutor; + std::unique_ptr<storage::spi::dummy::DummyBucketExecutor> _bucketExecutor; std::unique_ptr<vespalib::SyncableThreadExecutor> _singleExecutor; std::unique_ptr<searchcorespi::index::IThreadService> _master; std::shared_ptr<MyHandler> _handler; @@ -54,7 +55,7 @@ struct JobTest : public JobTestBase { std::unique_ptr<IMaintenanceJobRunner> _jobRunner; JobTest(); - ~JobTest(); + ~JobTest() override; void init(uint32_t allowedLidBloat, double allowedLidBloatFactor, double resourceLimitFactor = RESOURCE_LIMIT_FACTOR, @@ -68,7 +69,7 @@ struct JobTest : public JobTestBase { class JobDisabledByRemoveOpsTest : public JobTest { public: JobDisabledByRemoveOpsTest(); - ~JobDisabledByRemoveOpsTest(); + ~JobDisabledByRemoveOpsTest() override; void job_is_disabled_while_remove_ops_are_ongoing(bool remove_batch); void job_becomes_disabled_if_remove_ops_starts(bool remove_batch); @@ -80,7 +81,7 @@ struct MyCountJobRunner; struct MaxOutstandingJobTest : public JobTest { std::unique_ptr<MyCountJobRunner> runner; MaxOutstandingJobTest(); - ~MaxOutstandingJobTest(); + ~MaxOutstandingJobTest() override; void init(uint32_t maxOutstandingMoveOps); void assertRunToBlocked(); void assertRunToNotBlocked(); diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index b188b97404d..d92a945a42a 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -741,14 +741,10 @@ PersistenceEngine::getWLock() const namespace { -class SyncExecutorOnDestruction : public vespalib::IDestructorCallback { +class ExecutorRegistration : public vespalib::IDestructorCallback { public: - explicit SyncExecutorOnDestruction(std::shared_ptr<BucketExecutor> executor) : _executor(std::move(executor)) { } - ~SyncExecutorOnDestruction() override { - if (_executor) { - _executor->sync(); - } - } + explicit ExecutorRegistration(std::shared_ptr<BucketExecutor> executor) : _executor(std::move(executor)) { } + ~ExecutorRegistration() override = default; private: std::shared_ptr<BucketExecutor> _executor; }; @@ -760,7 +756,7 @@ PersistenceEngine::register_executor(std::shared_ptr<BucketExecutor> executor) { assert(_bucket_executor.expired()); _bucket_executor = executor; - return std::make_unique<SyncExecutorOnDestruction>(executor); + return std::make_unique<ExecutorRegistration>(executor); } std::unique_ptr<BucketTask> @@ -772,11 +768,4 @@ PersistenceEngine::execute(const storage::spi::Bucket &bucket, std::unique_ptr<B return task; } -void PersistenceEngine::sync() { - auto bucketExecutor = get_bucket_executor(); - if (bucketExecutor) { - bucketExecutor->sync(); - } -} - } // storage diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index 77dfc74d765..f1f680f63f6 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -130,7 +130,6 @@ public: WriteGuard getWLock() const; ResourceUsageTracker &get_resource_usage_tracker() noexcept { return *_resource_usage_tracker; } std::unique_ptr<BucketTask> execute(const Bucket &bucket, std::unique_ptr<BucketTask> task) override; - void sync() override; }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp index d3dae686d2d..3987282ef34 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp @@ -130,6 +130,9 @@ LidSpaceCompactionJobBase::run() } if (_scanItr && !_scanItr->valid()) { + if (!inSync()) { + return false; + } if (shouldRestartScanDocuments(_handler->getLidStatus())) { _scanItr = _handler->getIterator(); } else { diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h index 759a18361f7..e91502f766c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h @@ -50,6 +50,7 @@ private: void compactLidSpace(const search::LidUsageStats &stats); bool remove_batch_is_ongoing() const; bool remove_is_ongoing() const; + virtual bool inSync() const { return true; } protected: search::DocumentMetaData getNextDocument(const search::LidUsageStats &stats, bool retryLastDocument); public: diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp index cbf3de20b1f..a345cb9c927 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp @@ -11,6 +11,7 @@ #include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/util/lambdatask.h> #include <cassert> +#include <thread> using search::DocumentMetaData; using search::LidUsageStats; @@ -34,19 +35,19 @@ CompactionJob::scanDocuments(const LidUsageStats &stats) moveDocument(meta, std::make_shared<DoneContext>(std::make_pair(std::move(opsTracker), std::move(onDone)))); })); if (failed) return false; + _startedCount.fetch_add(1, std::memory_order_relaxed); if (isBlocked(BlockedReason::OUTSTANDING_OPS)) { return true; } } } - if (!_scanItr->valid()) { - sync(); - } return false; } void CompactionJob::moveDocument(const search::DocumentMetaData & meta, std::shared_ptr<IDestructorCallback> context) { + _executedCount.fetch_add(1, std::memory_order_relaxed); + if (_stopped.load(std::memory_order_relaxed)) return; // The real lid must be sampled in the master thread. //TODO remove target lid from createMoveOperation interface auto op = _handler->createMoveOperation(meta, 0); @@ -55,6 +56,7 @@ CompactionJob::moveDocument(const search::DocumentMetaData & meta, std::shared_p if (meta.gid != op->getDocument()->getId().getGlobalId()) return; _master.execute(makeLambdaTask([this, metaThen=meta, moveOp=std::move(op), onDone=std::move(context)]() { + if (_stopped.load(std::memory_order_relaxed)) return; search::DocumentMetaData metaNow = _handler->getMetaData(metaThen.lid); if (metaNow.lid != metaThen.lid) return; if (metaNow.bucketId != metaThen.bucketId) return; @@ -82,20 +84,25 @@ CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, blockableConfig, clusterStateChangedNotifier, nodeRetired), _master(master), _bucketExecutor(bucketExecutor), - _bucketSpace(bucketSpace) -{ -} + _bucketSpace(bucketSpace), + _stopped(false), + _startedCount(0), + _executedCount(0) +{ } CompactionJob::~CompactionJob() = default; -void -CompactionJob::sync() { - _bucketExecutor.sync(); +bool +CompactionJob::inSync() const { + return _executedCount == _startedCount; } void CompactionJob::onStop() { - sync(); + _stopped = true; + while ( ! inSync() ) { + std::this_thread::sleep_for(10us); + } } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h index 053950ebd2f..65b847d0468 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h @@ -4,8 +4,9 @@ #include "lid_space_compaction_job_base.h" #include <vespa/document/bucket/bucketspace.h> +#include <atomic> -namespace storage::spi { struct BucketExecutor;} +namespace storage::spi { struct BucketExecutor; } namespace searchcorespi::index { struct IThreadService; } namespace vespalib { class IDestructorCallback; } namespace proton { @@ -28,11 +29,14 @@ private: IThreadService & _master; BucketExecutor &_bucketExecutor; document::BucketSpace _bucketSpace; + std::atomic<bool> _stopped; + std::atomic<size_t> _startedCount; + std::atomic<size_t> _executedCount; bool scanDocuments(const search::LidUsageStats &stats) override; void moveDocument(const search::DocumentMetaData & meta, std::shared_ptr<IDestructorCallback> onDone); void onStop() override; - void sync(); + bool inSync() const override; public: CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, std::shared_ptr<ILidSpaceCompactionHandler> handler, diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index c71a7fee424..7c175686359 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -49,10 +49,6 @@ public: return _executor.execute(bucket, std::move(task)); } - void sync() override { - _executor.sync(); - } - private: spi::BucketExecutor & _executor; }; @@ -990,8 +986,4 @@ FileStorManager::execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketT return task; } -void -FileStorManager::sync() { -} - } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index 6eaef45e9bd..a48b4e0e208 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -175,7 +175,6 @@ private: void update_reported_state_after_db_init(); std::unique_ptr<spi::BucketTask> execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) override; - void sync() override; }; } // storage |