diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-19 07:46:55 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-19 13:54:30 +0000 |
commit | 5e676a8ffc0034848366293ad744e0850c585884 (patch) | |
tree | 7f440a67a5af24f340dcbf8bd3c0a94087b7ca17 /storage | |
parent | 317fedff48f7211e9d48c7d407d0512bd4ee65b9 (diff) |
Implement bucketexecutor interface and.
Diffstat (limited to 'storage')
10 files changed, 125 insertions, 1 deletions
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index bbde377fdec..21a94a3e957 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -209,6 +209,12 @@ PersistenceProviderWrapper::register_resource_usage_listener(spi::IResourceUsage return _spi.register_resource_usage_listener(listener); } +std::unique_ptr<vespalib::IDestructorCallback> +PersistenceProviderWrapper::register_executor(std::shared_ptr<spi::BucketExecutor> executor) +{ + return _spi.register_executor(std::move(executor)); +} + spi::Result PersistenceProviderWrapper::removeEntry(const spi::Bucket& bucket, spi::Timestamp timestamp, diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index b07f9c5e0f5..085a60c0e86 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -110,6 +110,7 @@ public: const spi::Bucket& target, spi::Context&) override; spi::Result removeEntry(const spi::Bucket&, spi::Timestamp, spi::Context&) override; std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(spi::IResourceUsageListener& listener) override; + std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor>) override; }; } // storage diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 74ad5d7f2ce..2cd9f34c0f3 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -14,6 +14,7 @@ #include <vespa/fastos/file.h> #include <vespa/persistence/dummyimpl/dummypersistence.h> #include <vespa/persistence/spi/test.h> +#include <vespa/persistence/spi/bucket_tasks.h> #include <vespa/storage/bucketdb/bucketmanager.h> #include <vespa/storage/persistence/bucketownershipnotifier.h> #include <vespa/storage/persistence/filestorage/filestorhandlerimpl.h> @@ -24,6 +25,7 @@ #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/vdslib/state/random.h> #include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/util/gate.h> #include <atomic> #include <thread> @@ -35,6 +37,7 @@ using document::Document; using namespace storage::api; using storage::spi::test::makeSpiBucket; using document::test::makeDocumentBucket; +using vespalib::IDestructorCallback; using namespace ::testing; #define ASSERT_SINGLE_REPLY(replytype, reply, link, time) \ @@ -409,6 +412,50 @@ TEST_F(FileStorManagerTest, put) { } } +TEST_F(FileStorManagerTest, running_task_against_unknown_bucket_fails) { + TestFileStorComponents c(*this); + + setClusterState("storage:3 distributor:3"); + EXPECT_TRUE(getDummyPersistence().getClusterState().nodeUp()); + + auto executor = getDummyPersistence().get_bucket_executor(); + EXPECT_TRUE(executor); + + spi::Bucket b1 = makeSpiBucket(document::BucketId(1)); + std::atomic<size_t> numInvocations(0); + auto response = executor->execute(b1, spi::makeBucketTask([&numInvocations](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) { + numInvocations++; + })); + EXPECT_TRUE(response); + EXPECT_EQ(0, numInvocations); + response->run(spi::Bucket(), {}); + EXPECT_EQ(1, numInvocations); +} + +TEST_F(FileStorManagerTest, running_task_against_existing_bucket_works) { + TestFileStorComponents c(*this); + + setClusterState("storage:3 distributor:3"); + EXPECT_TRUE(getDummyPersistence().getClusterState().nodeUp()); + + auto executor = getDummyPersistence().get_bucket_executor(); + EXPECT_TRUE(executor); + + spi::Bucket b1 = makeSpiBucket(document::BucketId(1)); + + createBucket(b1.getBucketId()); + + std::atomic<size_t> numInvocations(0); + vespalib::Gate gate; + auto response = executor->execute(b1, spi::makeBucketTask([&numInvocations, &gate](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) { + numInvocations++; + gate.countDown(); + })); + EXPECT_FALSE(response); + gate.await(); + EXPECT_EQ(1, numInvocations); +} + TEST_F(FileStorManagerTest, state_change) { TestFileStorComponents c(*this); diff --git a/storage/src/vespa/storage/common/messagebucket.cpp b/storage/src/vespa/storage/common/messagebucket.cpp index 61283fd3d04..3aa90138f36 100644 --- a/storage/src/vespa/storage/common/messagebucket.cpp +++ b/storage/src/vespa/storage/common/messagebucket.cpp @@ -66,6 +66,8 @@ getStorageMessageBucket(const api::StorageMessage& msg) return static_cast<const ReadBucketInfo&>(msg).getBucket(); case RecheckBucketInfoCommand::ID: return static_cast<const RecheckBucketInfoCommand&>(msg).getBucket(); + case RunTaskCommand::ID: + return static_cast<const RunTaskCommand&>(msg).getBucket(); default: break; } diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 4b105f6688f..b8ed6b8ec91 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -6,6 +6,7 @@ #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/document/update/documentupdate.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> +#include <vespa/vespalib/util/destructor_callbacks.h> namespace storage { @@ -96,6 +97,18 @@ AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider {} MessageTracker::UP +AsyncHandler::handleRunTask(RunTaskCommand& cmd, MessageTracker::UP tracker) const { + auto task = makeResultTask([tracker = std::move(tracker)](spi::Result::UP response) { + tracker->checkForError(*response); + tracker->sendReply(); + }); + spi::Bucket bucket(cmd.getBucket()); + auto onDone = std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)); + cmd.task().run(bucket, std::make_shared<vespalib::KeepAlive<decltype(onDone)>>(std::move(onDone))); + return tracker; +} + +MessageTracker::UP AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) const { MessageTracker & tracker = *trackerUP; diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index 92bf72e7c51..2d6b37a1cdd 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -2,6 +2,7 @@ #pragma once #include "types.h" +#include "messages.h" #include <vespa/storageapi/message/persistence.h> namespace document { class BucketIdFactory; } @@ -25,6 +26,7 @@ public: MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const; static bool is_async_message(api::MessageType::Id type_id) noexcept; private: static bool tasConditionExists(const api::TestAndSetCommand & cmd); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 0f9c9894615..aadc58c9af6 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -972,7 +972,11 @@ void FileStorManager::initialize_bucket_databases_from_provider() { std::unique_ptr<spi::BucketTask> FileStorManager::execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) { - (void) bucket; + StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get( + bucket.getBucketId(), "FileStorManager::execute")); + if (entry.exist()) { + _filestorHandler->schedule(std::make_shared<RunTaskCommand>(bucket, std::move(task))); + } return task; } diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp index 90597a49ad7..528a19ba7dc 100644 --- a/storage/src/vespa/storage/persistence/messages.cpp +++ b/storage/src/vespa/storage/persistence/messages.cpp @@ -2,6 +2,7 @@ #include "messages.h" #include <ostream> +#include <cassert> using document::BucketSpace; @@ -177,4 +178,23 @@ AbortBucketOperationsCommand::makeReply() { return std::make_unique<AbortBucketOperationsReply>(*this); } +std::unique_ptr<api::StorageReply> +RunTaskCommand::makeReply() { + return std::make_unique<RunTaskReply>(*this); +} + +RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) + : api::InternalCommand(ID), + _task(std::move(task)), + _bucket(bucket) +{ + assert(_task); +} + +RunTaskCommand::~RunTaskCommand() = default; + +RunTaskReply::RunTaskReply(const RunTaskCommand& cmd) + : api::InternalReply(ID, cmd) +{} + } diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h index a465437ae21..55ca87a1941 100644 --- a/storage/src/vespa/storage/persistence/messages.h +++ b/storage/src/vespa/storage/persistence/messages.h @@ -6,6 +6,7 @@ #include <vespa/persistence/spi/bucket.h> #include <vespa/persistence/spi/selection.h> #include <vespa/persistence/spi/read_consistency.h> +#include <vespa/persistence/spi/bucketexecutor.h> namespace storage { @@ -237,5 +238,31 @@ public: void print(std::ostream& out, bool verbose, const std::string& indent) const override; }; +class RunTaskCommand : public api::InternalCommand { +public: + static const uint32_t ID = 1011; + RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task); + ~RunTaskCommand(); + + document::Bucket getBucket() const override { return _bucket.getBucket(); } + std::unique_ptr<api::StorageReply> makeReply() override; + spi::BucketTask & task() & { + return *_task; + } + +private: + std::unique_ptr<spi::BucketTask> _task; + spi::Bucket _bucket; +}; + +class RunTaskReply : public api::InternalReply +{ +public: + explicit RunTaskReply(const RunTaskCommand&); +private: + static const uint32_t ID = 1012; +}; + + } // ns storage diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index cbe5454f4e7..38ffd2c57e7 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -75,6 +75,8 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr return _simpleHandler.handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg), std::move(tracker)); case RecheckBucketInfoCommand::ID: return _splitJoinHandler.handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg), std::move(tracker)); + case RunTaskCommand::ID: + return _asyncHandler.handleRunTask(static_cast<RunTaskCommand &>(msg), std::move(tracker)); default: LOG(warning, "Persistence handler received unhandled internal command %s", msg.toString().c_str()); break; |