summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp8
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h2
-rw-r--r--persistence/src/vespa/persistence/spi/bucket_tasks.h21
-rw-r--r--persistence/src/vespa/persistence/spi/bucketexecutor.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp78
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp39
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h1
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp28
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp15
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp15
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.h2
-rw-r--r--storage/src/vespa/storage/persistence/messages.cpp11
-rw-r--r--storage/src/vespa/storage/persistence/messages.h1
-rw-r--r--storageapi/src/vespa/storageapi/message/internal.h4
17 files changed, 148 insertions, 94 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
index b76787736ac..d3e765d3d56 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
@@ -22,9 +22,9 @@ DummyBucketExecutor::~DummyBucketExecutor() {
sync();
}
-std::unique_ptr<BucketTask>
+void
DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) {
- _executor->execute(makeLambdaTask([this, bucket, bucketTask=std::move(task)]() {
+ auto failed = _executor->execute(makeLambdaTask([this, bucket, bucketTask=std::move(task)]() {
{
std::unique_lock guard(_lock);
// Use contains when dropping support for gcc 8.
@@ -41,7 +41,9 @@ DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask>
_cond.notify_all();
}));
}));
- return task;
+ if (failed) {
+ failed->run();
+ }
}
void
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
index 2479787eb31..a3cf146a0f4 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
@@ -17,7 +17,7 @@ class DummyBucketExecutor : public BucketExecutor {
public:
DummyBucketExecutor(size_t numExecutors);
~DummyBucketExecutor() override;
- std::unique_ptr<BucketTask> execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) override;
+ void execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) override;
void sync();
private:
std::unique_ptr<vespalib::SyncableThreadExecutor> _executor;
diff --git a/persistence/src/vespa/persistence/spi/bucket_tasks.h b/persistence/src/vespa/persistence/spi/bucket_tasks.h
index 0b9c283817d..9468dd09dc3 100644
--- a/persistence/src/vespa/persistence/spi/bucket_tasks.h
+++ b/persistence/src/vespa/persistence/spi/bucket_tasks.h
@@ -9,11 +9,12 @@ namespace storage::spi {
/**
* Simple Bucket task that wraps a lambda that does the job.
*/
-template<class FunctionType>
+template<class FunctionType, class FailedFunction>
class LambdaBucketTask : public BucketTask {
public:
- explicit LambdaBucketTask(FunctionType &&func)
- : _func(std::move(func))
+ explicit LambdaBucketTask(FunctionType &&func, FailedFunction &&failed)
+ : _func(std::move(func)),
+ _failed(std::move(failed))
{}
~LambdaBucketTask() override = default;
@@ -21,16 +22,20 @@ public:
void run(const Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete) override {
_func(bucket, std::move(onComplete));
}
+ void fail(const Bucket & bucket) override {
+ _failed(bucket);
+ }
private:
- FunctionType _func;
+ FunctionType _func;
+ FailedFunction _failed;
};
-template<class FunctionType>
+template<class FunctionType, class FailedFunction>
std::unique_ptr<BucketTask>
-makeBucketTask(FunctionType &&function) {
- return std::make_unique<LambdaBucketTask<std::decay_t<FunctionType>>>
- (std::forward<FunctionType>(function));
+makeBucketTask(FunctionType &&function, FailedFunction && failed) {
+ return std::make_unique<LambdaBucketTask<std::decay_t<FunctionType>, std::decay_t<FailedFunction>>>
+ (std::forward<FunctionType>(function), std::forward<FailedFunction>(failed));
}
}
diff --git a/persistence/src/vespa/persistence/spi/bucketexecutor.h b/persistence/src/vespa/persistence/spi/bucketexecutor.h
index 26884dd04ae..350376be595 100644
--- a/persistence/src/vespa/persistence/spi/bucketexecutor.h
+++ b/persistence/src/vespa/persistence/spi/bucketexecutor.h
@@ -20,16 +20,16 @@ class BucketTask {
public:
virtual ~BucketTask() = default;
virtual void run(const Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete) = 0;
+ virtual void fail(const Bucket &) = 0;
};
/**
- * Interface for running a BucketTask. If it fails the task will be returned.
- * That would normally indicate a fatal error.
- * sync() will be called during detatching to ensure the implementation is drained.
+ * Interface for running a BucketTask. If running the task fails either synchronously or asynchronously.
+ * The fail method will be invoked, either synchronously or asynchronously.
*/
struct BucketExecutor {
virtual ~BucketExecutor() = default;
- virtual std::unique_ptr<BucketTask> execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) = 0;
+ virtual void execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) = 0;
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index d92a945a42a..31acf183989 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -759,13 +759,14 @@ PersistenceEngine::register_executor(std::shared_ptr<BucketExecutor> executor)
return std::make_unique<ExecutorRegistration>(executor);
}
-std::unique_ptr<BucketTask>
+void
PersistenceEngine::execute(const storage::spi::Bucket &bucket, std::unique_ptr<BucketTask> task) {
auto bucketExecutor = get_bucket_executor();
if (bucketExecutor) {
bucketExecutor->execute(bucket, std::move(task));
+ } else {
+ return task->fail(bucket);
}
- return task;
}
} // storage
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
index f1f680f63f6..ac7a9847873 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
@@ -129,7 +129,7 @@ public:
void populateInitialBucketDB(const WriteGuard & guard, BucketSpace bucketSpace, IPersistenceHandler &targetHandler);
WriteGuard getWLock() const;
ResourceUsageTracker &get_resource_usage_tracker() noexcept { return *_resource_usage_tracker; }
- std::unique_ptr<BucketTask> execute(const Bucket &bucket, std::unique_ptr<BucketTask> task) override;
+ void execute(const Bucket &bucket, std::unique_ptr<BucketTask> task) override;
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
index becaa0c9e31..03c5c1401a0 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
@@ -150,31 +150,6 @@ BucketMoveJobV2::needMove(const ScanIterator &itr) const {
return {true, wantReady};
}
-void
-BucketMoveJobV2::startMove(BucketMoverSP mover, size_t maxDocsToMove) {
- auto [keys, done] = mover->getKeysToMove(maxDocsToMove);
- if (done) {
- mover->setBucketDone();
- }
- if (keys.empty()) return;
- if (_stopped.load(std::memory_order_relaxed)) return;
- mover->updateLastValidGid(keys.back()._gid);
- Bucket spiBucket(document::Bucket(_bucketSpace, mover->getBucket()));
- auto bucketTask = makeBucketTask(
- [this, mover=std::move(mover), keys=std::move(keys),opsTracker=getLimiter().beginOperation()]
- (const Bucket & bucket, std::shared_ptr<IDestructorCallback> onDone) mutable
- {
- assert(mover->getBucket() == bucket.getBucketId());
- using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallbackSP, IDestructorCallbackSP>>;
- prepareMove(std::move(mover), std::move(keys),
- std::make_shared<DoneContext>(std::make_pair(std::move(opsTracker), std::move(onDone))));
- });
- auto failed = _bucketExecutor.execute(spiBucket, std::move(bucketTask));
- if (!failed) {
- _startedCount.fetch_add(1, std::memory_order_relaxed);
- }
-}
-
namespace {
class IncOnDestruct {
@@ -189,6 +164,59 @@ private:
}
+class StartMove : public storage::spi::BucketTask {
+public:
+ using IDestructorCallbackSP = std::shared_ptr<vespalib::IDestructorCallback>;
+ StartMove(BucketMoveJobV2 & job, std::shared_ptr<BucketMover> mover,
+ std::vector<BucketMover::MoveKey> keys,
+ IDestructorCallbackSP opsTracker)
+ : _job(job),
+ _mover(std::move(mover)),
+ _keys(std::move(keys)),
+ _opsTracker(std::move(opsTracker))
+ {}
+
+ void run(const Bucket &bucket, IDestructorCallbackSP onDone) override {
+ assert(_mover->getBucket() == bucket.getBucketId());
+ using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallbackSP, IDestructorCallbackSP>>;
+ _job.prepareMove(std::move(_mover), std::move(_keys),
+ std::make_shared<DoneContext>(std::make_pair(std::move(_opsTracker), std::move(onDone))));
+ }
+
+ void fail(const Bucket &bucket) override {
+ _job._master.execute(makeLambdaTask([this, bucketId=bucket.getBucketId()]() {
+ _job.failOperation(bucketId);
+ }));
+ }
+
+private:
+ BucketMoveJobV2 & _job;
+ std::shared_ptr<BucketMover> _mover;
+ std::vector<BucketMover::MoveKey> _keys;
+ IDestructorCallbackSP _opsTracker;
+};
+
+void
+BucketMoveJobV2::failOperation(BucketId bucketId) {
+ IncOnDestruct countGuard(_executedCount);
+ considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId);
+}
+
+void
+BucketMoveJobV2::startMove(BucketMoverSP mover, size_t maxDocsToMove) {
+ auto [keys, done] = mover->getKeysToMove(maxDocsToMove);
+ if (done) {
+ mover->setBucketDone();
+ }
+ if (keys.empty()) return;
+ if (_stopped.load(std::memory_order_relaxed)) return;
+ mover->updateLastValidGid(keys.back()._gid);
+ Bucket spiBucket(document::Bucket(_bucketSpace, mover->getBucket()));
+ auto bucketTask = std::make_unique<StartMove>(*this, std::move(mover), std::move(keys), getLimiter().beginOperation());
+ _startedCount.fetch_add(1, std::memory_order_relaxed);
+ _bucketExecutor.execute(spiBucket, std::move(bucketTask));
+}
+
void
BucketMoveJobV2::prepareMove(BucketMoverSP mover, std::vector<MoveKey> keys, IDestructorCallbackSP onDone)
{
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
index 7e0c45ba8fa..afb486ed011 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
@@ -86,6 +86,8 @@ private:
void backFillMovers();
void cancelMovesForBucket(BucketId bucket);
bool moveDocs(size_t maxDocsToMove);
+ void failOperation(BucketId bucket);
+ friend class StartMove;
public:
BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &calc,
IDocumentMoveHandler &moveHandler,
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
index 7557f19412d..bae4833d657 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
@@ -21,6 +21,26 @@ using vespalib::makeLambdaTask;
namespace proton::lidspace {
+namespace {
+
+class IncOnDestruct {
+public:
+ IncOnDestruct(std::atomic<size_t> & count) : _count(count) {}
+ ~IncOnDestruct() {
+ _count.fetch_add(1, std::memory_order_relaxed);
+ }
+private:
+ std::atomic<size_t> & _count;
+};
+
+}
+
+void
+CompactionJob::failOperation() {
+ _executedCount.fetch_add(1, std::memory_order_relaxed);
+ _scanItr.reset();
+}
+
bool
CompactionJob::scanDocuments(const LidUsageStats &stats)
{
@@ -29,14 +49,15 @@ CompactionJob::scanDocuments(const LidUsageStats &stats)
if (document.valid()) {
Bucket metaBucket(document::Bucket(_bucketSpace, document.bucketId));
IDestructorCallback::SP context = getLimiter().beginOperation();
- auto failed = _bucketExecutor.execute(metaBucket, makeBucketTask([this, meta=document, opsTracker=std::move(context)]
+ auto bucketTask = makeBucketTask([this, meta=document, opsTracker=std::move(context)]
(const Bucket & bucket, std::shared_ptr<IDestructorCallback> onDone) {
assert(bucket.getBucketId() == meta.bucketId);
using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallback::SP, IDestructorCallback::SP>>;
moveDocument(meta, std::make_shared<DoneContext>(std::make_pair(std::move(opsTracker), std::move(onDone))));
- }));
- if (failed) return false;
+ }, [this](const Bucket &) { _master.execute(makeLambdaTask([this] { failOperation(); } )); });
+
_startedCount.fetch_add(1, std::memory_order_relaxed);
+ _bucketExecutor.execute(metaBucket, std::move(bucketTask));
if (isBlocked(BlockedReason::OUTSTANDING_OPS)) {
return true;
}
@@ -45,18 +66,6 @@ CompactionJob::scanDocuments(const LidUsageStats &stats)
return false;
}
-namespace {
- class IncOnDestruct {
- public:
- IncOnDestruct(std::atomic<size_t> & count) : _count(count) {}
- ~IncOnDestruct() {
- _count.fetch_add(1, std::memory_order_relaxed);
- }
- private:
- std::atomic<size_t> & _count;
- };
-}
-
void
CompactionJob::moveDocument(const search::DocumentMetaData & meta, std::shared_ptr<IDestructorCallback> context) {
IncOnDestruct countGuard(_executedCount);
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
index 65b847d0468..af9fe50c545 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
@@ -37,6 +37,7 @@ private:
void moveDocument(const search::DocumentMetaData & meta, std::shared_ptr<IDestructorCallback> onDone);
void onStop() override;
bool inSync() const override;
+ void failOperation();
public:
CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
std::shared_ptr<ILidSpaceCompactionHandler> handler,
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 290a3574209..891c0e5dace 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -425,14 +425,13 @@ TEST_F(FileStorManagerTest, running_task_against_unknown_bucket_fails) {
ASSERT_TRUE(executor);
spi::Bucket b1 = makeSpiBucket(document::BucketId(1));
- std::atomic<size_t> numInvocations(0);
- auto response = executor->execute(b1, spi::makeBucketTask([&numInvocations](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) {
- numInvocations++;
- }));
- ASSERT_TRUE(response);
- EXPECT_EQ(0, numInvocations);
- response->run(spi::Bucket(), {});
- EXPECT_EQ(1, numInvocations);
+ std::atomic<size_t> success(0);
+ std::atomic<size_t> failures(0);
+ auto task = spi::makeBucketTask([&success](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) { success++;},
+ [&failures](const spi::Bucket &) { failures++; });
+ executor->execute(b1, std::move(task));
+ EXPECT_EQ(0, success);
+ EXPECT_EQ(1, failures);
}
TEST_F(FileStorManagerTest, running_task_against_existing_bucket_works) {
@@ -448,15 +447,16 @@ TEST_F(FileStorManagerTest, running_task_against_existing_bucket_works) {
createBucket(b1.getBucketId());
- std::atomic<size_t> numInvocations(0);
+ std::atomic<size_t> success(0);
+ std::atomic<size_t> failures(0);
vespalib::Gate gate;
- auto response = executor->execute(b1, spi::makeBucketTask([&numInvocations, &gate](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) {
- numInvocations++;
+ executor->execute(b1, spi::makeBucketTask([&success, &gate](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) {
+ success++;
gate.countDown();
- }));
- EXPECT_FALSE(response);
+ }, [&failures](const spi::Bucket &) { failures++; }));
gate.await();
- EXPECT_EQ(1, numInvocations);
+ EXPECT_EQ(1, success);
+ EXPECT_EQ(0, failures);
}
TEST_F(FileStorManagerTest, state_change) {
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index dce59262298..d80666518ee 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -565,8 +565,7 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck
// Fail with bucket not found if op != MOVE
api::BucketCommand& cmd(static_cast<api::BucketCommand&>(msg));
if (cmd.getBucket() == source) {
- if (op == MOVE) {
- } else {
+ if (op != MOVE) {
returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op));
}
}
@@ -597,8 +596,7 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck
// Move to correct queue if op == MOVE
// Fail with bucket not found if op != MOVE
if (bucket == source) {
- if (op == MOVE) {
- } else {
+ if (op != MOVE) {
returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op));
}
}
@@ -608,8 +606,7 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck
// Move to correct queue if op == MOVE
// Fail with bucket not found if op != MOVE
if (bucket == source) {
- if (op == MOVE) {
- } else {
+ if (op != MOVE) {
returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op));
}
}
@@ -623,9 +620,11 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck
break;
}
case RunTaskCommand::ID:
- LOG(debug, "Remapping load for bucket %s for reason %u, "
- "for RunTaskCommand request for this bucket.",
+ LOG(debug, "While remapping load for bucket %s for reason %u, "
+ "we fail the RunTaskCommand.",
source.getBucketId().toString().c_str(), op);
+ returnCode = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE,
+ "Will not run task that should be remapped.");
break;
default:
// Fail and log error
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index f4a94af0d35..83f268358cb 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -35,7 +35,7 @@ using vespalib::make_string_short::fmt;
namespace {
- VESPA_THREAD_STACK_TAG(response_executor)
+VESPA_THREAD_STACK_TAG(response_executor)
}
@@ -46,8 +46,8 @@ class BucketExecutorWrapper : public spi::BucketExecutor {
public:
BucketExecutorWrapper(spi::BucketExecutor & executor) noexcept : _executor(executor) { }
- std::unique_ptr<spi::BucketTask> execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) override {
- return _executor.execute(bucket, std::move(task));
+ void execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) override {
+ _executor.execute(bucket, std::move(task));
}
private:
@@ -975,17 +975,16 @@ void FileStorManager::initialize_bucket_databases_from_provider() {
_init_handler.notifyDoneInitializing();
}
-std::unique_ptr<spi::BucketTask>
+void
FileStorManager::execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) {
StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get(
bucket.getBucketId(), "FileStorManager::execute"));
if (entry.exist()) {
auto cmd = std::make_shared<RunTaskCommand>(bucket, std::move(task));
- if ( ! _filestorHandler->schedule(cmd) ) {
- task = cmd->stealTask();
- }
+ _filestorHandler->schedule(cmd);
+ } else {
+ task->fail(bucket);
}
- return task;
}
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
index a48b4e0e208..ee4276cb21e 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
@@ -174,7 +174,7 @@ private:
void propagateClusterStates();
void update_reported_state_after_db_init();
- std::unique_ptr<spi::BucketTask> execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) override;
+ void execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) override;
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp
index beae22a429b..42285556530 100644
--- a/storage/src/vespa/storage/persistence/messages.cpp
+++ b/storage/src/vespa/storage/persistence/messages.cpp
@@ -189,7 +189,11 @@ RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr<spi::B
_bucket(bucket)
{ }
-RunTaskCommand::~RunTaskCommand() = default;
+RunTaskCommand::~RunTaskCommand() {
+ if (_task) {
+ _task->fail(_bucket);
+ }
+}
void
RunTaskCommand::print(std::ostream& out, bool verbose, const std::string& indent) const {
@@ -206,12 +210,15 @@ RunTaskCommand::run(const spi::Bucket & bucket, std::shared_ptr<vespalib::IDestr
{
if (_task) {
_task->run(bucket, std::move(onComplete));
+ _task.reset();
}
}
RunTaskReply::RunTaskReply(const RunTaskCommand& cmd)
: api::InternalReply(ID, cmd)
-{}
+{ }
+
+RunTaskReply::~RunTaskReply() = default;
void
RunTaskReply::print(std::ostream& out, bool verbose, const std::string& indent) const {
diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h
index 50834782f39..d8cfb3ac445 100644
--- a/storage/src/vespa/storage/persistence/messages.h
+++ b/storage/src/vespa/storage/persistence/messages.h
@@ -263,6 +263,7 @@ class RunTaskReply : public api::InternalReply
{
public:
explicit RunTaskReply(const RunTaskCommand&);
+ ~RunTaskReply();
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
private:
static constexpr uint32_t ID = 1012;
diff --git a/storageapi/src/vespa/storageapi/message/internal.h b/storageapi/src/vespa/storageapi/message/internal.h
index 4246c43a3c4..af50abba005 100644
--- a/storageapi/src/vespa/storageapi/message/internal.h
+++ b/storageapi/src/vespa/storageapi/message/internal.h
@@ -32,7 +32,7 @@ class InternalCommand : public StorageCommand {
public:
InternalCommand(uint32_t type);
- ~InternalCommand();
+ ~InternalCommand() override;
uint32_t getType() const { return _type; }
@@ -54,7 +54,7 @@ class InternalReply : public StorageReply {
public:
InternalReply(uint32_t type, const InternalCommand& cmd);
- ~InternalReply();
+ ~InternalReply() override;
uint32_t getType() const { return _type; }