diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-20 13:02:35 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-20 13:02:35 +0100 |
commit | 669dcc1c35f21672fb63363a5a7f2f7989f20cef (patch) | |
tree | e2dd3036aeda242958c21cd137575cbabfbf1312 | |
parent | 6f649d5dd2234c7dd581f9ac206663cbfa6b8ae8 (diff) | |
parent | ff551559deacaacc3bb77699686bb6c65e08a818 (diff) |
Merge pull request #16098 from vespa-engine/balder/implement-bucketexecutor-for-filestormanager
Implement bucketexecutor interface and.
18 files changed, 257 insertions, 21 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 0c1c5db69d6..0865500d3c0 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -8,6 +8,7 @@ #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/persistence/spi/i_resource_usage_listener.h> #include <vespa/persistence/spi/resource_usage.h> +#include <vespa/persistence/spi/bucketexecutor.h> #include <vespa/vespalib/util/crc.h> #include <vespa/document/fieldset/fieldsetrepo.h> #include <vespa/vespalib/stllike/asciistream.h> @@ -863,6 +864,30 @@ DummyPersistence::register_resource_usage_listener(IResourceUsageListener &liste return {}; } +namespace { + +class SyncExecutorOnDestruction : public vespalib::IDestructorCallback { +public: + explicit SyncExecutorOnDestruction(std::shared_ptr<BucketExecutor> executor) : _executor(std::move(executor)) { } + ~SyncExecutorOnDestruction() override { + if (_executor) { + _executor->sync(); + } + } +private: + std::shared_ptr<BucketExecutor> _executor; +}; + +} + +std::unique_ptr<vespalib::IDestructorCallback> +DummyPersistence::register_executor(std::shared_ptr<BucketExecutor> executor) +{ + assert(_bucket_executor.expired()); + _bucket_executor = executor; + return std::make_unique<SyncExecutorOnDestruction>(executor); +} + std::string DummyPersistence::dumpBucket(const Bucket& b) const { diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index c37af0d33eb..ad50648abaf 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -176,6 +176,8 @@ public: Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(IResourceUsageListener& listener) override; + std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<BucketExecutor>) override; + std::shared_ptr<BucketExecutor> get_bucket_executor() noexcept { return _bucket_executor.lock(); } /** * The following methods are used only for unit testing. @@ -213,6 +215,7 @@ private: std::condition_variable _cond; std::unique_ptr<ClusterState> _clusterState; + std::weak_ptr<BucketExecutor> _bucket_executor; std::unique_ptr<document::select::Node> parseDocumentSelection( const string& documentSelection, diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp index facdb2cadfa..1d873e9a20e 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp @@ -30,10 +30,4 @@ AbstractPersistenceProvider::getModifiedBuckets(BucketSpace) const return BucketIdListResult(list); } -std::unique_ptr<vespalib::IDestructorCallback> -AbstractPersistenceProvider::register_executor(std::shared_ptr<BucketExecutor>) -{ - return {}; -} - } diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h index 5023febe9a2..016928ab10e 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h @@ -48,8 +48,6 @@ public: * Default impl empty. */ BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; - - std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<BucketExecutor>) override; }; } diff --git a/persistence/src/vespa/persistence/spi/bucket_tasks.h b/persistence/src/vespa/persistence/spi/bucket_tasks.h new file mode 100644 index 00000000000..0b9c283817d --- /dev/null +++ b/persistence/src/vespa/persistence/spi/bucket_tasks.h @@ -0,0 +1,36 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "bucketexecutor.h" + +namespace storage::spi { + +/** + * Simple Bucket task that wraps a lambda that does the job. + */ +template<class FunctionType> +class LambdaBucketTask : public BucketTask { +public: + explicit LambdaBucketTask(FunctionType &&func) + : _func(std::move(func)) + {} + + ~LambdaBucketTask() override = default; + + void run(const Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete) override { + _func(bucket, std::move(onComplete)); + } + +private: + FunctionType _func; +}; + +template<class FunctionType> +std::unique_ptr<BucketTask> +makeBucketTask(FunctionType &&function) { + return std::make_unique<LambdaBucketTask<std::decay_t<FunctionType>>> + (std::forward<FunctionType>(function)); +} + +} diff --git a/persistence/src/vespa/persistence/spi/bucketexecutor.h b/persistence/src/vespa/persistence/spi/bucketexecutor.h index 07c86fd5ffb..8237b78cca0 100644 --- a/persistence/src/vespa/persistence/spi/bucketexecutor.h +++ b/persistence/src/vespa/persistence/spi/bucketexecutor.h @@ -3,7 +3,8 @@ #pragma once #include "bucket.h" -#include "operationcomplete.h" + +namespace vespalib { class IDestructorCallback; } namespace storage::spi { @@ -17,7 +18,7 @@ namespace storage::spi { class BucketTask { public: virtual ~BucketTask() = default; - virtual void run(const Bucket & bucket, std::unique_ptr<OperationComplete> onComplete) = 0; + virtual void run(const Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete) = 0; }; /** diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 0d5d8ad3144..8bec6f9dd68 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -3,6 +3,7 @@ #include "persistenceengine.h" #include "ipersistenceengineowner.h" #include "transport_latch.h" +#include <vespa/persistence/spi/bucketexecutor.h> #include <vespa/vespalib/stllike/hash_set.h> #include <vespa/document/fieldvalue/document.h> #include <vespa/document/datatype/documenttype.h> @@ -18,6 +19,7 @@ LOG_SETUP(".proton.persistenceengine.persistenceengine"); using document::Document; using document::DocumentId; using storage::spi::BucketChecksum; +using storage::spi::BucketExecutor; using storage::spi::BucketIdListResult; using storage::spi::BucketInfo; using storage::spi::BucketInfoResult; @@ -737,4 +739,28 @@ PersistenceEngine::getWLock() const return WriteGuard(_rwMutex); } +namespace { + +class SyncExecutorOnDestruction : public vespalib::IDestructorCallback { +public: + explicit SyncExecutorOnDestruction(std::shared_ptr<BucketExecutor> executor) : _executor(std::move(executor)) { } + ~SyncExecutorOnDestruction() override { + if (_executor) { + _executor->sync(); + } + } +private: + std::shared_ptr<BucketExecutor> _executor; +}; + +} + +std::unique_ptr<vespalib::IDestructorCallback> +PersistenceEngine::register_executor(std::shared_ptr<BucketExecutor> executor) +{ + assert(_bucket_executor.expired()); + _bucket_executor = executor; + return std::make_unique<SyncExecutorOnDestruction>(executor); +} + } // storage diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index 659156fdea0..b5a99525575 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -39,6 +39,7 @@ private: using TimestampList = storage::spi::TimestampList; using UpdateResult = storage::spi::UpdateResult; using OperationComplete = storage::spi::OperationComplete; + using BucketExecutor = storage::spi::BucketExecutor; struct IteratorEntry { PersistenceHandlerSequence handler_sequence; @@ -73,6 +74,7 @@ private: mutable ExtraModifiedBuckets _extraModifiedBuckets; mutable std::shared_mutex _rwMutex; std::shared_ptr<ResourceUsageTracker> _resource_usage_tracker; + std::weak_ptr<BucketExecutor> _bucket_executor; using ReadGuard = std::shared_lock<std::shared_mutex>; using WriteGuard = std::unique_lock<std::shared_mutex>; @@ -116,12 +118,14 @@ public: Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(IResourceUsageListener& listener) override; + std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<BucketExecutor>) override; void destroyIterators(); void propagateSavedClusterState(BucketSpace bucketSpace, IPersistenceHandler &handler); void grabExtraModifiedBuckets(BucketSpace bucketSpace, IPersistenceHandler &handler); void populateInitialBucketDB(const WriteGuard & guard, BucketSpace bucketSpace, IPersistenceHandler &targetHandler); WriteGuard getWLock() const; ResourceUsageTracker &get_resource_usage_tracker() noexcept { return *_resource_usage_tracker; } + std::shared_ptr<BucketExecutor> get_bucket_executor() noexcept { return _bucket_executor.lock(); } }; } diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index bbde377fdec..21a94a3e957 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -209,6 +209,12 @@ PersistenceProviderWrapper::register_resource_usage_listener(spi::IResourceUsage return _spi.register_resource_usage_listener(listener); } +std::unique_ptr<vespalib::IDestructorCallback> +PersistenceProviderWrapper::register_executor(std::shared_ptr<spi::BucketExecutor> executor) +{ + return _spi.register_executor(std::move(executor)); +} + spi::Result PersistenceProviderWrapper::removeEntry(const spi::Bucket& bucket, spi::Timestamp timestamp, diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index b07f9c5e0f5..085a60c0e86 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -110,6 +110,7 @@ public: const spi::Bucket& target, spi::Context&) override; spi::Result removeEntry(const spi::Bucket&, spi::Timestamp, spi::Context&) override; std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(spi::IResourceUsageListener& listener) override; + std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor>) override; }; } // storage diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 74ad5d7f2ce..acccbb8b9b9 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -14,6 +14,7 @@ #include <vespa/fastos/file.h> #include <vespa/persistence/dummyimpl/dummypersistence.h> #include <vespa/persistence/spi/test.h> +#include <vespa/persistence/spi/bucket_tasks.h> #include <vespa/storage/bucketdb/bucketmanager.h> #include <vespa/storage/persistence/bucketownershipnotifier.h> #include <vespa/storage/persistence/filestorage/filestorhandlerimpl.h> @@ -24,6 +25,7 @@ #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/vdslib/state/random.h> #include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/util/gate.h> #include <atomic> #include <thread> @@ -35,6 +37,7 @@ using document::Document; using namespace storage::api; using storage::spi::test::makeSpiBucket; using document::test::makeDocumentBucket; +using vespalib::IDestructorCallback; using namespace ::testing; #define ASSERT_SINGLE_REPLY(replytype, reply, link, time) \ @@ -409,6 +412,50 @@ TEST_F(FileStorManagerTest, put) { } } +TEST_F(FileStorManagerTest, running_task_against_unknown_bucket_fails) { + TestFileStorComponents c(*this); + + setClusterState("storage:3 distributor:3"); + EXPECT_TRUE(getDummyPersistence().getClusterState().nodeUp()); + + auto executor = getDummyPersistence().get_bucket_executor(); + ASSERT_TRUE(executor); + + spi::Bucket b1 = makeSpiBucket(document::BucketId(1)); + std::atomic<size_t> numInvocations(0); + auto response = executor->execute(b1, spi::makeBucketTask([&numInvocations](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) { + numInvocations++; + })); + ASSERT_TRUE(response); + EXPECT_EQ(0, numInvocations); + response->run(spi::Bucket(), {}); + EXPECT_EQ(1, numInvocations); +} + +TEST_F(FileStorManagerTest, running_task_against_existing_bucket_works) { + TestFileStorComponents c(*this); + + setClusterState("storage:3 distributor:3"); + EXPECT_TRUE(getDummyPersistence().getClusterState().nodeUp()); + + auto executor = getDummyPersistence().get_bucket_executor(); + ASSERT_TRUE(executor); + + spi::Bucket b1 = makeSpiBucket(document::BucketId(1)); + + createBucket(b1.getBucketId()); + + std::atomic<size_t> numInvocations(0); + vespalib::Gate gate; + auto response = executor->execute(b1, spi::makeBucketTask([&numInvocations, &gate](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) { + numInvocations++; + gate.countDown(); + })); + EXPECT_FALSE(response); + gate.await(); + EXPECT_EQ(1, numInvocations); +} + TEST_F(FileStorManagerTest, state_change) { TestFileStorComponents c(*this); diff --git a/storage/src/vespa/storage/common/messagebucket.cpp b/storage/src/vespa/storage/common/messagebucket.cpp index 61283fd3d04..3aa90138f36 100644 --- a/storage/src/vespa/storage/common/messagebucket.cpp +++ b/storage/src/vespa/storage/common/messagebucket.cpp @@ -66,6 +66,8 @@ getStorageMessageBucket(const api::StorageMessage& msg) return static_cast<const ReadBucketInfo&>(msg).getBucket(); case RecheckBucketInfoCommand::ID: return static_cast<const RecheckBucketInfoCommand&>(msg).getBucket(); + case RunTaskCommand::ID: + return static_cast<const RunTaskCommand&>(msg).getBucket(); default: break; } diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 4b105f6688f..b8ed6b8ec91 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -6,6 +6,7 @@ #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/document/update/documentupdate.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> +#include <vespa/vespalib/util/destructor_callbacks.h> namespace storage { @@ -96,6 +97,18 @@ AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider {} MessageTracker::UP +AsyncHandler::handleRunTask(RunTaskCommand& cmd, MessageTracker::UP tracker) const { + auto task = makeResultTask([tracker = std::move(tracker)](spi::Result::UP response) { + tracker->checkForError(*response); + tracker->sendReply(); + }); + spi::Bucket bucket(cmd.getBucket()); + auto onDone = std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)); + cmd.task().run(bucket, std::make_shared<vespalib::KeepAlive<decltype(onDone)>>(std::move(onDone))); + return tracker; +} + +MessageTracker::UP AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) const { MessageTracker & tracker = *trackerUP; diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index 92bf72e7c51..2d6b37a1cdd 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -2,6 +2,7 @@ #pragma once #include "types.h" +#include "messages.h" #include <vespa/storageapi/message/persistence.h> namespace document { class BucketIdFactory; } @@ -25,6 +26,7 @@ public: MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const; static bool is_async_message(api::MessageType::Id type_id) noexcept; private: static bool tasConditionExists(const api::TestAndSetCommand & cmd); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 0f9c9894615..aadc58c9af6 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -972,7 +972,11 @@ void FileStorManager::initialize_bucket_databases_from_provider() { std::unique_ptr<spi::BucketTask> FileStorManager::execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) { - (void) bucket; + StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get( + bucket.getBucketId(), "FileStorManager::execute")); + if (entry.exist()) { + _filestorHandler->schedule(std::make_shared<RunTaskCommand>(bucket, std::move(task))); + } return task; } diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp index 90597a49ad7..7ccb3ee895d 100644 --- a/storage/src/vespa/storage/persistence/messages.cpp +++ b/storage/src/vespa/storage/persistence/messages.cpp @@ -2,6 +2,7 @@ #include "messages.h" #include <ostream> +#include <cassert> using document::BucketSpace; @@ -177,4 +178,43 @@ AbortBucketOperationsCommand::makeReply() { return std::make_unique<AbortBucketOperationsReply>(*this); } +std::unique_ptr<api::StorageReply> +RunTaskCommand::makeReply() { + return std::make_unique<RunTaskReply>(*this); +} + +RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) + : api::InternalCommand(ID), + _task(std::move(task)), + _bucket(bucket) +{ + assert(_task); +} + +RunTaskCommand::~RunTaskCommand() = default; + +void +RunTaskCommand::print(std::ostream& out, bool verbose, const std::string& indent) const { + out << "RunTaskCommand(" << _bucket <<")"; + + if (verbose) { + out << " : "; + InternalCommand::print(out, true, indent); + } +} + +RunTaskReply::RunTaskReply(const RunTaskCommand& cmd) + : api::InternalReply(ID, cmd) +{} + +void +RunTaskReply::print(std::ostream& out, bool verbose, const std::string& indent) const { + out << "RunTaskReply()"; + + if (verbose) { + out << " : "; + InternalReply::print(out, true, indent); + } +} + } diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h index a465437ae21..043747d10d2 100644 --- a/storage/src/vespa/storage/persistence/messages.h +++ b/storage/src/vespa/storage/persistence/messages.h @@ -6,6 +6,7 @@ #include <vespa/persistence/spi/bucket.h> #include <vespa/persistence/spi/selection.h> #include <vespa/persistence/spi/read_consistency.h> +#include <vespa/persistence/spi/bucketexecutor.h> namespace storage { @@ -16,7 +17,7 @@ private: uint32_t _maxByteSize; public: - static const uint32_t ID = 1001; + static constexpr uint32_t ID = 1001; typedef std::unique_ptr<GetIterCommand> UP; typedef std::shared_ptr<GetIterCommand> SP; @@ -50,7 +51,7 @@ private: public: typedef std::unique_ptr<GetIterReply> UP; typedef std::shared_ptr<GetIterReply> SP; - static const uint32_t ID = 1002; + static constexpr uint32_t ID = 1002; explicit GetIterReply(GetIterCommand& cmd); ~GetIterReply() override; @@ -80,7 +81,7 @@ class CreateIteratorCommand : public api::InternalCommand spi::ReadConsistency _readConsistency; public: - static const uint32_t ID = 1003; + static constexpr uint32_t ID = 1003; typedef std::unique_ptr<CreateIteratorCommand> UP; typedef std::shared_ptr<CreateIteratorCommand> SP; @@ -114,7 +115,7 @@ class CreateIteratorReply : public api::InternalReply document::Bucket _bucket; spi::IteratorId _iteratorId; public: - static const uint32_t ID = 1004; + static constexpr uint32_t ID = 1004; typedef std::unique_ptr<CreateIteratorReply> UP; typedef std::shared_ptr<CreateIteratorReply> SP; @@ -132,7 +133,7 @@ class DestroyIteratorCommand : public api::InternalCommand { spi::IteratorId _iteratorId; public: - static const uint32_t ID = 1005; + static constexpr uint32_t ID = 1005; typedef std::unique_ptr<DestroyIteratorCommand> UP; typedef std::shared_ptr<DestroyIteratorCommand> SP; @@ -150,7 +151,7 @@ class DestroyIteratorReply : public api::InternalReply { spi::IteratorId _iteratorId; public: - static const uint32_t ID = 1006; + static constexpr uint32_t ID = 1006; typedef std::unique_ptr<DestroyIteratorReply> UP; typedef std::shared_ptr<DestroyIteratorReply> SP; @@ -164,7 +165,7 @@ class RecheckBucketInfoCommand : public api::InternalCommand { document::Bucket _bucket; public: - static const uint32_t ID = 1007; + static constexpr uint32_t ID = 1007; typedef std::shared_ptr<RecheckBucketInfoCommand> SP; typedef std::unique_ptr<RecheckBucketInfoCommand> UP; @@ -182,7 +183,7 @@ class RecheckBucketInfoReply : public api::InternalReply { document::Bucket _bucket; public: - static const uint32_t ID = 1008; + static constexpr uint32_t ID = 1008; typedef std::shared_ptr<RecheckBucketInfoReply> SP; typedef std::unique_ptr<RecheckBucketInfoReply> UP; @@ -206,7 +207,7 @@ public: } }; - static const uint32_t ID = 1009; + static constexpr uint32_t ID = 1009; typedef std::shared_ptr<AbortBucketOperationsCommand> SP; typedef std::shared_ptr<const AbortBucketOperationsCommand> CSP; private: @@ -227,7 +228,7 @@ public: class AbortBucketOperationsReply : public api::InternalReply { public: - static const uint32_t ID = 1010; + static constexpr uint32_t ID = 1010; typedef std::shared_ptr<AbortBucketOperationsReply> SP; typedef std::shared_ptr<const AbortBucketOperationsReply> CSP; @@ -237,5 +238,36 @@ public: void print(std::ostream& out, bool verbose, const std::string& indent) const override; }; + +// Internal Command task for bringing along a Bucket and a BucketTask in +// the inner workings of the storagelink chain. +class RunTaskCommand : public api::InternalCommand { +public: + static constexpr uint32_t ID = 1011; + RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task); + ~RunTaskCommand(); + + document::Bucket getBucket() const override { return _bucket.getBucket(); } + std::unique_ptr<api::StorageReply> makeReply() override; + spi::BucketTask & task() & { + return *_task; + } + + void print(std::ostream& out, bool verbose, const std::string& indent) const override; +private: + std::unique_ptr<spi::BucketTask> _task; + spi::Bucket _bucket; +}; + +// Simple reply for matching the RunTaskCommand +class RunTaskReply : public api::InternalReply +{ +public: + explicit RunTaskReply(const RunTaskCommand&); + void print(std::ostream& out, bool verbose, const std::string& indent) const override; +private: + static constexpr uint32_t ID = 1012; +}; + } // ns storage diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index cbe5454f4e7..38ffd2c57e7 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -75,6 +75,8 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr return _simpleHandler.handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg), std::move(tracker)); case RecheckBucketInfoCommand::ID: return _splitJoinHandler.handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg), std::move(tracker)); + case RunTaskCommand::ID: + return _asyncHandler.handleRunTask(static_cast<RunTaskCommand &>(msg), std::move(tracker)); default: LOG(warning, "Persistence handler received unhandled internal command %s", msg.toString().c_str()); break; |