summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp4
-rw-r--r--searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp81
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h8
6 files changed, 42 insertions, 57 deletions
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp
index a82437012c2..683709317c2 100644
--- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp
+++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp
@@ -56,11 +56,11 @@ JobTestBase::init(uint32_t allowedLidBloat,
_singleExecutor = std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000);
_master = std::make_unique<proton::ExecutorThreadService> (*_singleExecutor);
_bucketExecutor = std::make_unique<storage::spi::dummy::DummyBucketExecutor>(4);
- _job = std::make_unique<lidspace::CompactionJob>(compactCfg, _handler, _storer, *_master, *_bucketExecutor,
+ _job = std::make_shared<lidspace::CompactionJob>(compactCfg, _handler, _storer, *_master, *_bucketExecutor,
_diskMemUsageNotifier, blockableCfg, _clusterStateHandler, nodeRetired,
document::BucketSpace::placeHolder());
} else {
- _job = std::make_unique<LidSpaceCompactionJob>(compactCfg, _handler, _storer, _frozenHandler, _diskMemUsageNotifier,
+ _job = std::make_shared<LidSpaceCompactionJob>(compactCfg, _handler, _storer, _frozenHandler, _diskMemUsageNotifier,
blockableCfg, _clusterStateHandler, nodeRetired);
}
}
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h
index a78ad66592f..0a4e5c56acb 100644
--- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h
+++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h
@@ -16,7 +16,7 @@ struct JobTestBase : public ::testing::TestWithParam<bool> {
MyFrozenBucketHandler _frozenHandler;
test::DiskMemUsageNotifier _diskMemUsageNotifier;
test::ClusterStateHandler _clusterStateHandler;
- std::unique_ptr<BlockableMaintenanceJob> _job;
+ std::shared_ptr<BlockableMaintenanceJob> _job;
JobTestBase();
~JobTestBase() override;
void init(uint32_t allowedLidBloat,
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp
index 99efc3d4e41..225d128f9bf 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp
@@ -130,9 +130,6 @@ LidSpaceCompactionJobBase::run()
}
if (_scanItr && !_scanItr->valid()) {
- if (!inSync()) {
- return false;
- }
if (shouldRestartScanDocuments(_handler->getLidStatus())) {
_scanItr = _handler->getIterator();
} else {
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h
index 774d82a5d1d..d127025c496 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h
@@ -50,7 +50,6 @@ private:
void compactLidSpace(const search::LidUsageStats &stats);
bool remove_batch_is_ongoing() const;
bool remove_is_ongoing() const;
- virtual bool inSync() const { return true; }
protected:
search::DocumentMetaData getNextDocument(const search::LidUsageStats &stats, bool retryLastDocument);
public:
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
index 70a63a4a4eb..52fd18cd971 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
@@ -23,31 +23,40 @@ namespace proton::lidspace {
namespace {
-class IncOnDestruct {
-public:
- IncOnDestruct(std::atomic<size_t> & count) : _count(count) {}
- ~IncOnDestruct() {
- _count.fetch_add(1, std::memory_order_relaxed);
- }
-private:
- std::atomic<size_t> & _count;
-};
-
bool
-isSameDocument(const search::DocumentMetaData & a, const search::DocumentMetaData & b) {
+isSameDocument(const search::DocumentMetaData &a, const search::DocumentMetaData &b) {
return (a.lid == b.lid) &&
(a.bucketId == b.bucketId) &&
(a.gid == b.gid) &&
- (a.timestamp == b.timestamp); // Timestamp check can be removed once logic has proved itself in large scale.
+ (a.timestamp ==
+ b.timestamp); // Timestamp check can be removed once logic has proved itself in large scale.
}
}
-void
-CompactionJob::failOperation() {
- IncOnDestruct countGuard(_executedCount);
- _master.execute(makeLambdaTask([this] { _scanItr.reset(); }));
-}
+class CompactionJob::MoveTask : public storage::spi::BucketTask {
+public:
+ MoveTask(std::shared_ptr<CompactionJob> job, const search::DocumentMetaData & meta, IDestructorCallback::SP opsTracker)
+ : _job(std::move(job)),
+ _meta(meta),
+ _opsTracker(std::move(opsTracker))
+ { }
+ void run(const Bucket & bucket, IDestructorCallback::SP onDone) override {
+ assert(bucket.getBucketId() == _meta.bucketId);
+ using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallback::SP, IDestructorCallback::SP>>;
+ CompactionJob::moveDocument(std::move(_job), _meta,
+ std::make_shared<DoneContext>(std::make_pair(std::move(_opsTracker), std::move(onDone))));
+ }
+ void fail(const Bucket & bucket) override {
+ assert(bucket.getBucketId() == _meta.bucketId);
+ auto & master = _job->_master;
+ master.execute(makeLambdaTask([job=std::move(_job)] { job->_scanItr.reset(); }));
+ }
+private:
+ std::shared_ptr<CompactionJob> _job;
+ const search::DocumentMetaData _meta;
+ IDestructorCallback::SP _opsTracker;
+};
bool
CompactionJob::scanDocuments(const LidUsageStats &stats)
@@ -56,16 +65,7 @@ CompactionJob::scanDocuments(const LidUsageStats &stats)
DocumentMetaData document = getNextDocument(stats, false);
if (document.valid()) {
Bucket metaBucket(document::Bucket(_bucketSpace, document.bucketId));
- IDestructorCallback::SP context = getLimiter().beginOperation();
- auto bucketTask = makeBucketTask([this, meta=document, opsTracker=std::move(context)]
- (const Bucket & bucket, std::shared_ptr<IDestructorCallback> onDone) {
- assert(bucket.getBucketId() == meta.bucketId);
- using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallback::SP, IDestructorCallback::SP>>;
- moveDocument(meta, std::make_shared<DoneContext>(std::make_pair(std::move(opsTracker), std::move(onDone))));
- }, [this](const Bucket &) { failOperation(); });
-
- _startedCount.fetch_add(1, std::memory_order_relaxed);
- _bucketExecutor.execute(metaBucket, std::move(bucketTask));
+ _bucketExecutor.execute(metaBucket, std::make_unique<MoveTask>(shared_from_this(), document, getLimiter().beginOperation()));
if (isBlocked(BlockedReason::OUTSTANDING_OPS)) {
return true;
}
@@ -75,19 +75,20 @@ CompactionJob::scanDocuments(const LidUsageStats &stats)
}
void
-CompactionJob::moveDocument(const search::DocumentMetaData & metaThen, std::shared_ptr<IDestructorCallback> context) {
- IncOnDestruct countGuard(_executedCount);
- if (_stopped.load(std::memory_order_relaxed)) return;
+CompactionJob::moveDocument(std::shared_ptr<CompactionJob> job, const search::DocumentMetaData & metaThen,
+ std::shared_ptr<IDestructorCallback> context)
+{
// The real lid must be sampled in the master thread.
//TODO remove target lid from createMoveOperation interface
- auto op = _handler->createMoveOperation(metaThen, 0);
+ auto op = job->_handler->createMoveOperation(metaThen, 0);
if (!op || !op->getDocument()) return;
// Early detection and force md5 calculation outside of master thread
if (metaThen.gid != op->getDocument()->getId().getGlobalId()) return;
- _master.execute(makeLambdaTask([this, meta=metaThen, moveOp=std::move(op), onDone=std::move(context)]() mutable {
- if (_stopped.load(std::memory_order_relaxed)) return;
- completeMove(meta, std::move(moveOp), std::move(onDone));
+ auto & master = job->_master;
+ master.execute(makeLambdaTask([self=std::move(job), meta=metaThen, moveOp=std::move(op), onDone=std::move(context)]() mutable {
+ if (self->_stopped.load(std::memory_order_relaxed)) return;
+ self->completeMove(meta, std::move(moveOp), std::move(onDone));
}));
}
@@ -125,24 +126,14 @@ CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
_master(master),
_bucketExecutor(bucketExecutor),
_bucketSpace(bucketSpace),
- _stopped(false),
- _startedCount(0),
- _executedCount(0)
+ _stopped(false)
{ }
CompactionJob::~CompactionJob() = default;
-bool
-CompactionJob::inSync() const {
- return _executedCount == _startedCount;
-}
-
void
CompactionJob::onStop() {
_stopped = true;
- while ( ! inSync() ) {
- std::this_thread::sleep_for(1ms);
- }
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
index 21445de8d85..bbf39e5adf6 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
@@ -31,16 +31,14 @@ private:
BucketExecutor &_bucketExecutor;
document::BucketSpace _bucketSpace;
std::atomic<bool> _stopped;
- std::atomic<size_t> _startedCount;
- std::atomic<size_t> _executedCount;
bool scanDocuments(const search::LidUsageStats &stats) override;
- void moveDocument(const search::DocumentMetaData & metaThen, std::shared_ptr<IDestructorCallback> onDone);
+ static void moveDocument(std::shared_ptr<CompactionJob> job, const search::DocumentMetaData & metaThen,
+ std::shared_ptr<IDestructorCallback> onDone);
void completeMove(const search::DocumentMetaData & metaThen, std::unique_ptr<MoveOperation> moveOp,
std::shared_ptr<IDestructorCallback> onDone);
void onStop() override;
- bool inSync() const override;
- void failOperation();
+ class MoveTask;
public:
CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
std::shared_ptr<ILidSpaceCompactionHandler> handler,