aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h2
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp12
-rw-r--r--persistence/src/vespa/persistence/spi/bucketexecutor.h1
-rw-r--r--searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h9
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp19
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp27
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h8
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp8
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.h1
12 files changed, 41 insertions, 51 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
index b832cb6c02c..2479787eb31 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
@@ -18,7 +18,7 @@ public:
DummyBucketExecutor(size_t numExecutors);
~DummyBucketExecutor() override;
std::unique_ptr<BucketTask> execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) override;
- void sync() override;
+ void sync();
private:
std::unique_ptr<vespalib::SyncableThreadExecutor> _executor;
std::mutex _lock;
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
index 0865500d3c0..305f1c81192 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
@@ -866,14 +866,10 @@ DummyPersistence::register_resource_usage_listener(IResourceUsageListener &liste
namespace {
-class SyncExecutorOnDestruction : public vespalib::IDestructorCallback {
+class ExecutorRegistration : public vespalib::IDestructorCallback {
public:
- explicit SyncExecutorOnDestruction(std::shared_ptr<BucketExecutor> executor) : _executor(std::move(executor)) { }
- ~SyncExecutorOnDestruction() override {
- if (_executor) {
- _executor->sync();
- }
- }
+ explicit ExecutorRegistration(std::shared_ptr<BucketExecutor> executor) : _executor(std::move(executor)) { }
+ ~ExecutorRegistration() override = default;
private:
std::shared_ptr<BucketExecutor> _executor;
};
@@ -885,7 +881,7 @@ DummyPersistence::register_executor(std::shared_ptr<BucketExecutor> executor)
{
assert(_bucket_executor.expired());
_bucket_executor = executor;
- return std::make_unique<SyncExecutorOnDestruction>(executor);
+ return std::make_unique<ExecutorRegistration>(executor);
}
std::string
diff --git a/persistence/src/vespa/persistence/spi/bucketexecutor.h b/persistence/src/vespa/persistence/spi/bucketexecutor.h
index 8237b78cca0..d1ada30959e 100644
--- a/persistence/src/vespa/persistence/spi/bucketexecutor.h
+++ b/persistence/src/vespa/persistence/spi/bucketexecutor.h
@@ -29,7 +29,6 @@ public:
struct BucketExecutor {
virtual ~BucketExecutor() = default;
virtual std::unique_ptr<BucketTask> execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) = 0;
- virtual void sync() = 0;
};
}
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h
index fd1cf7fb5a8..dd2760022c2 100644
--- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h
+++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h
@@ -5,8 +5,9 @@
#include <vespa/persistence/spi/bucketexecutor.h>
#include <vespa/vespalib/gtest/gtest.h>
+namespace storage::spi::dummy { class DummyBucketExecutor; }
struct JobTestBase : public ::testing::TestWithParam<bool> {
- std::unique_ptr<storage::spi::BucketExecutor> _bucketExecutor;
+ std::unique_ptr<storage::spi::dummy::DummyBucketExecutor> _bucketExecutor;
std::unique_ptr<vespalib::SyncableThreadExecutor> _singleExecutor;
std::unique_ptr<searchcorespi::index::IThreadService> _master;
std::shared_ptr<MyHandler> _handler;
@@ -54,7 +55,7 @@ struct JobTest : public JobTestBase {
std::unique_ptr<IMaintenanceJobRunner> _jobRunner;
JobTest();
- ~JobTest();
+ ~JobTest() override;
void init(uint32_t allowedLidBloat,
double allowedLidBloatFactor,
double resourceLimitFactor = RESOURCE_LIMIT_FACTOR,
@@ -68,7 +69,7 @@ struct JobTest : public JobTestBase {
class JobDisabledByRemoveOpsTest : public JobTest {
public:
JobDisabledByRemoveOpsTest();
- ~JobDisabledByRemoveOpsTest();
+ ~JobDisabledByRemoveOpsTest() override;
void job_is_disabled_while_remove_ops_are_ongoing(bool remove_batch);
void job_becomes_disabled_if_remove_ops_starts(bool remove_batch);
@@ -80,7 +81,7 @@ struct MyCountJobRunner;
struct MaxOutstandingJobTest : public JobTest {
std::unique_ptr<MyCountJobRunner> runner;
MaxOutstandingJobTest();
- ~MaxOutstandingJobTest();
+ ~MaxOutstandingJobTest() override;
void init(uint32_t maxOutstandingMoveOps);
void assertRunToBlocked();
void assertRunToNotBlocked();
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index b188b97404d..d92a945a42a 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -741,14 +741,10 @@ PersistenceEngine::getWLock() const
namespace {
-class SyncExecutorOnDestruction : public vespalib::IDestructorCallback {
+class ExecutorRegistration : public vespalib::IDestructorCallback {
public:
- explicit SyncExecutorOnDestruction(std::shared_ptr<BucketExecutor> executor) : _executor(std::move(executor)) { }
- ~SyncExecutorOnDestruction() override {
- if (_executor) {
- _executor->sync();
- }
- }
+ explicit ExecutorRegistration(std::shared_ptr<BucketExecutor> executor) : _executor(std::move(executor)) { }
+ ~ExecutorRegistration() override = default;
private:
std::shared_ptr<BucketExecutor> _executor;
};
@@ -760,7 +756,7 @@ PersistenceEngine::register_executor(std::shared_ptr<BucketExecutor> executor)
{
assert(_bucket_executor.expired());
_bucket_executor = executor;
- return std::make_unique<SyncExecutorOnDestruction>(executor);
+ return std::make_unique<ExecutorRegistration>(executor);
}
std::unique_ptr<BucketTask>
@@ -772,11 +768,4 @@ PersistenceEngine::execute(const storage::spi::Bucket &bucket, std::unique_ptr<B
return task;
}
-void PersistenceEngine::sync() {
- auto bucketExecutor = get_bucket_executor();
- if (bucketExecutor) {
- bucketExecutor->sync();
- }
-}
-
} // storage
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
index 77dfc74d765..f1f680f63f6 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
@@ -130,7 +130,6 @@ public:
WriteGuard getWLock() const;
ResourceUsageTracker &get_resource_usage_tracker() noexcept { return *_resource_usage_tracker; }
std::unique_ptr<BucketTask> execute(const Bucket &bucket, std::unique_ptr<BucketTask> task) override;
- void sync() override;
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp
index d3dae686d2d..3987282ef34 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp
@@ -130,6 +130,9 @@ LidSpaceCompactionJobBase::run()
}
if (_scanItr && !_scanItr->valid()) {
+ if (!inSync()) {
+ return false;
+ }
if (shouldRestartScanDocuments(_handler->getLidStatus())) {
_scanItr = _handler->getIterator();
} else {
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h
index 759a18361f7..e91502f766c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h
@@ -50,6 +50,7 @@ private:
void compactLidSpace(const search::LidUsageStats &stats);
bool remove_batch_is_ongoing() const;
bool remove_is_ongoing() const;
+ virtual bool inSync() const { return true; }
protected:
search::DocumentMetaData getNextDocument(const search::LidUsageStats &stats, bool retryLastDocument);
public:
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
index cbf3de20b1f..a345cb9c927 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
@@ -11,6 +11,7 @@
#include <vespa/vespalib/util/destructor_callbacks.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <cassert>
+#include <thread>
using search::DocumentMetaData;
using search::LidUsageStats;
@@ -34,19 +35,19 @@ CompactionJob::scanDocuments(const LidUsageStats &stats)
moveDocument(meta, std::make_shared<DoneContext>(std::make_pair(std::move(opsTracker), std::move(onDone))));
}));
if (failed) return false;
+ _startedCount.fetch_add(1, std::memory_order_relaxed);
if (isBlocked(BlockedReason::OUTSTANDING_OPS)) {
return true;
}
}
}
- if (!_scanItr->valid()) {
- sync();
- }
return false;
}
void
CompactionJob::moveDocument(const search::DocumentMetaData & meta, std::shared_ptr<IDestructorCallback> context) {
+ _executedCount.fetch_add(1, std::memory_order_relaxed);
+ if (_stopped.load(std::memory_order_relaxed)) return;
// The real lid must be sampled in the master thread.
//TODO remove target lid from createMoveOperation interface
auto op = _handler->createMoveOperation(meta, 0);
@@ -55,6 +56,7 @@ CompactionJob::moveDocument(const search::DocumentMetaData & meta, std::shared_p
if (meta.gid != op->getDocument()->getId().getGlobalId()) return;
_master.execute(makeLambdaTask([this, metaThen=meta, moveOp=std::move(op), onDone=std::move(context)]() {
+ if (_stopped.load(std::memory_order_relaxed)) return;
search::DocumentMetaData metaNow = _handler->getMetaData(metaThen.lid);
if (metaNow.lid != metaThen.lid) return;
if (metaNow.bucketId != metaThen.bucketId) return;
@@ -82,20 +84,25 @@ CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
blockableConfig, clusterStateChangedNotifier, nodeRetired),
_master(master),
_bucketExecutor(bucketExecutor),
- _bucketSpace(bucketSpace)
-{
-}
+ _bucketSpace(bucketSpace),
+ _stopped(false),
+ _startedCount(0),
+ _executedCount(0)
+{ }
CompactionJob::~CompactionJob() = default;
-void
-CompactionJob::sync() {
- _bucketExecutor.sync();
+bool
+CompactionJob::inSync() const {
+ return _executedCount == _startedCount;
}
void
CompactionJob::onStop() {
- sync();
+ _stopped = true;
+ while ( ! inSync() ) {
+ std::this_thread::sleep_for(10us);
+ }
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
index 053950ebd2f..65b847d0468 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
@@ -4,8 +4,9 @@
#include "lid_space_compaction_job_base.h"
#include <vespa/document/bucket/bucketspace.h>
+#include <atomic>
-namespace storage::spi { struct BucketExecutor;}
+namespace storage::spi { struct BucketExecutor; }
namespace searchcorespi::index { struct IThreadService; }
namespace vespalib { class IDestructorCallback; }
namespace proton {
@@ -28,11 +29,14 @@ private:
IThreadService & _master;
BucketExecutor &_bucketExecutor;
document::BucketSpace _bucketSpace;
+ std::atomic<bool> _stopped;
+ std::atomic<size_t> _startedCount;
+ std::atomic<size_t> _executedCount;
bool scanDocuments(const search::LidUsageStats &stats) override;
void moveDocument(const search::DocumentMetaData & meta, std::shared_ptr<IDestructorCallback> onDone);
void onStop() override;
- void sync();
+ bool inSync() const override;
public:
CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
std::shared_ptr<ILidSpaceCompactionHandler> handler,
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index c71a7fee424..7c175686359 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -49,10 +49,6 @@ public:
return _executor.execute(bucket, std::move(task));
}
- void sync() override {
- _executor.sync();
- }
-
private:
spi::BucketExecutor & _executor;
};
@@ -990,8 +986,4 @@ FileStorManager::execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketT
return task;
}
-void
-FileStorManager::sync() {
-}
-
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
index 6eaef45e9bd..a48b4e0e208 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
@@ -175,7 +175,6 @@ private:
void update_reported_state_after_db_init();
std::unique_ptr<spi::BucketTask> execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) override;
- void sync() override;
};
} // storage