diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-02-23 21:09:08 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-02-23 21:21:34 +0000 |
commit | 2d1f4ba77c803a7bdb9f4a2f0bd31b6208011f82 (patch) | |
tree | 0fb95d10334300368a147af2fc9e335b8d4c9b5f /storage | |
parent | fcfe291173a6a6f14a77abec96709383fee21866 (diff) |
- Change error handling so that both synchonous and asynchronous errors can be reported back from bucket executor.
- Treat remapping as an error.
- For lidspace compaction job iterator is reset and will be recreated on next invocation.
- For bucketmove th ebucket is rechecked and either discarded or restarted.
Diffstat (limited to 'storage')
6 files changed, 39 insertions, 33 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 290a3574209..891c0e5dace 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -425,14 +425,13 @@ TEST_F(FileStorManagerTest, running_task_against_unknown_bucket_fails) { ASSERT_TRUE(executor); spi::Bucket b1 = makeSpiBucket(document::BucketId(1)); - std::atomic<size_t> numInvocations(0); - auto response = executor->execute(b1, spi::makeBucketTask([&numInvocations](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) { - numInvocations++; - })); - ASSERT_TRUE(response); - EXPECT_EQ(0, numInvocations); - response->run(spi::Bucket(), {}); - EXPECT_EQ(1, numInvocations); + std::atomic<size_t> success(0); + std::atomic<size_t> failures(0); + auto task = spi::makeBucketTask([&success](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) { success++;}, + [&failures](const spi::Bucket &) { failures++; }); + executor->execute(b1, std::move(task)); + EXPECT_EQ(0, success); + EXPECT_EQ(1, failures); } TEST_F(FileStorManagerTest, running_task_against_existing_bucket_works) { @@ -448,15 +447,16 @@ TEST_F(FileStorManagerTest, running_task_against_existing_bucket_works) { createBucket(b1.getBucketId()); - std::atomic<size_t> numInvocations(0); + std::atomic<size_t> success(0); + std::atomic<size_t> failures(0); vespalib::Gate gate; - auto response = executor->execute(b1, spi::makeBucketTask([&numInvocations, &gate](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) { - numInvocations++; + executor->execute(b1, spi::makeBucketTask([&success, &gate](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) { + success++; gate.countDown(); - })); - EXPECT_FALSE(response); + }, [&failures](const spi::Bucket &) { failures++; })); gate.await(); - EXPECT_EQ(1, numInvocations); + EXPECT_EQ(1, success); + EXPECT_EQ(0, failures); } TEST_F(FileStorManagerTest, state_change) { diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index dce59262298..d80666518ee 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -565,8 +565,7 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck // Fail with bucket not found if op != MOVE api::BucketCommand& cmd(static_cast<api::BucketCommand&>(msg)); if (cmd.getBucket() == source) { - if (op == MOVE) { - } else { + if (op != MOVE) { returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op)); } } @@ -597,8 +596,7 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck // Move to correct queue if op == MOVE // Fail with bucket not found if op != MOVE if (bucket == source) { - if (op == MOVE) { - } else { + if (op != MOVE) { returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op)); } } @@ -608,8 +606,7 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck // Move to correct queue if op == MOVE // Fail with bucket not found if op != MOVE if (bucket == source) { - if (op == MOVE) { - } else { + if (op != MOVE) { returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op)); } } @@ -623,9 +620,11 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck break; } case RunTaskCommand::ID: - LOG(debug, "Remapping load for bucket %s for reason %u, " - "for RunTaskCommand request for this bucket.", + LOG(debug, "While remapping load for bucket %s for reason %u, " + "we fail the RunTaskCommand.", source.getBucketId().toString().c_str(), op); + returnCode = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, + "Will not run task that should be remapped."); break; default: // Fail and log error diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index f4a94af0d35..83f268358cb 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -35,7 +35,7 @@ using vespalib::make_string_short::fmt; namespace { - VESPA_THREAD_STACK_TAG(response_executor) +VESPA_THREAD_STACK_TAG(response_executor) } @@ -46,8 +46,8 @@ class BucketExecutorWrapper : public spi::BucketExecutor { public: BucketExecutorWrapper(spi::BucketExecutor & executor) noexcept : _executor(executor) { } - std::unique_ptr<spi::BucketTask> execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) override { - return _executor.execute(bucket, std::move(task)); + void execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) override { + _executor.execute(bucket, std::move(task)); } private: @@ -975,17 +975,16 @@ void FileStorManager::initialize_bucket_databases_from_provider() { _init_handler.notifyDoneInitializing(); } -std::unique_ptr<spi::BucketTask> +void FileStorManager::execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) { StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get( bucket.getBucketId(), "FileStorManager::execute")); if (entry.exist()) { auto cmd = std::make_shared<RunTaskCommand>(bucket, std::move(task)); - if ( ! _filestorHandler->schedule(cmd) ) { - task = cmd->stealTask(); - } + _filestorHandler->schedule(cmd); + } else { + task->fail(bucket); } - return task; } } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index a48b4e0e208..ee4276cb21e 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -174,7 +174,7 @@ private: void propagateClusterStates(); void update_reported_state_after_db_init(); - std::unique_ptr<spi::BucketTask> execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) override; + void execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) override; }; } // storage diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp index beae22a429b..42285556530 100644 --- a/storage/src/vespa/storage/persistence/messages.cpp +++ b/storage/src/vespa/storage/persistence/messages.cpp @@ -189,7 +189,11 @@ RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr<spi::B _bucket(bucket) { } -RunTaskCommand::~RunTaskCommand() = default; +RunTaskCommand::~RunTaskCommand() { + if (_task) { + _task->fail(_bucket); + } +} void RunTaskCommand::print(std::ostream& out, bool verbose, const std::string& indent) const { @@ -206,12 +210,15 @@ RunTaskCommand::run(const spi::Bucket & bucket, std::shared_ptr<vespalib::IDestr { if (_task) { _task->run(bucket, std::move(onComplete)); + _task.reset(); } } RunTaskReply::RunTaskReply(const RunTaskCommand& cmd) : api::InternalReply(ID, cmd) -{} +{ } + +RunTaskReply::~RunTaskReply() = default; void RunTaskReply::print(std::ostream& out, bool verbose, const std::string& indent) const { diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h index 50834782f39..d8cfb3ac445 100644 --- a/storage/src/vespa/storage/persistence/messages.h +++ b/storage/src/vespa/storage/persistence/messages.h @@ -263,6 +263,7 @@ class RunTaskReply : public api::InternalReply { public: explicit RunTaskReply(const RunTaskCommand&); + ~RunTaskReply(); void print(std::ostream& out, bool verbose, const std::string& indent) const override; private: static constexpr uint32_t ID = 1012; |