summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-01-20 13:02:35 +0100
committerGitHub <noreply@github.com>2021-01-20 13:02:35 +0100
commit669dcc1c35f21672fb63363a5a7f2f7989f20cef (patch)
treee2dd3036aeda242958c21cd137575cbabfbf1312
parent6f649d5dd2234c7dd581f9ac206663cbfa6b8ae8 (diff)
parentff551559deacaacc3bb77699686bb6c65e08a818 (diff)
Merge pull request #16098 from vespa-engine/balder/implement-bucketexecutor-for-filestormanager
Implement bucketexecutor interface and.
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp25
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.h3
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp6
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h2
-rw-r--r--persistence/src/vespa/persistence/spi/bucket_tasks.h36
-rw-r--r--persistence/src/vespa/persistence/spi/bucketexecutor.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp26
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h4
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp6
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h1
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp47
-rw-r--r--storage/src/vespa/storage/common/messagebucket.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp13
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/messages.cpp40
-rw-r--r--storage/src/vespa/storage/persistence/messages.h52
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp2
18 files changed, 257 insertions, 21 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
index 0c1c5db69d6..0865500d3c0 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
@@ -8,6 +8,7 @@
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/persistence/spi/i_resource_usage_listener.h>
#include <vespa/persistence/spi/resource_usage.h>
+#include <vespa/persistence/spi/bucketexecutor.h>
#include <vespa/vespalib/util/crc.h>
#include <vespa/document/fieldset/fieldsetrepo.h>
#include <vespa/vespalib/stllike/asciistream.h>
@@ -863,6 +864,30 @@ DummyPersistence::register_resource_usage_listener(IResourceUsageListener &liste
return {};
}
+namespace {
+
+class SyncExecutorOnDestruction : public vespalib::IDestructorCallback {
+public:
+ explicit SyncExecutorOnDestruction(std::shared_ptr<BucketExecutor> executor) : _executor(std::move(executor)) { }
+ ~SyncExecutorOnDestruction() override {
+ if (_executor) {
+ _executor->sync();
+ }
+ }
+private:
+ std::shared_ptr<BucketExecutor> _executor;
+};
+
+}
+
+std::unique_ptr<vespalib::IDestructorCallback>
+DummyPersistence::register_executor(std::shared_ptr<BucketExecutor> executor)
+{
+ assert(_bucket_executor.expired());
+ _bucket_executor = executor;
+ return std::make_unique<SyncExecutorOnDestruction>(executor);
+}
+
std::string
DummyPersistence::dumpBucket(const Bucket& b) const
{
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
index c37af0d33eb..ad50648abaf 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
@@ -176,6 +176,8 @@ public:
Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override;
std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(IResourceUsageListener& listener) override;
+ std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<BucketExecutor>) override;
+ std::shared_ptr<BucketExecutor> get_bucket_executor() noexcept { return _bucket_executor.lock(); }
/**
* The following methods are used only for unit testing.
@@ -213,6 +215,7 @@ private:
std::condition_variable _cond;
std::unique_ptr<ClusterState> _clusterState;
+ std::weak_ptr<BucketExecutor> _bucket_executor;
std::unique_ptr<document::select::Node> parseDocumentSelection(
const string& documentSelection,
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
index facdb2cadfa..1d873e9a20e 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
@@ -30,10 +30,4 @@ AbstractPersistenceProvider::getModifiedBuckets(BucketSpace) const
return BucketIdListResult(list);
}
-std::unique_ptr<vespalib::IDestructorCallback>
-AbstractPersistenceProvider::register_executor(std::shared_ptr<BucketExecutor>)
-{
- return {};
-}
-
}
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
index 5023febe9a2..016928ab10e 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
@@ -48,8 +48,6 @@ public:
* Default impl empty.
*/
BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
-
- std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<BucketExecutor>) override;
};
}
diff --git a/persistence/src/vespa/persistence/spi/bucket_tasks.h b/persistence/src/vespa/persistence/spi/bucket_tasks.h
new file mode 100644
index 00000000000..0b9c283817d
--- /dev/null
+++ b/persistence/src/vespa/persistence/spi/bucket_tasks.h
@@ -0,0 +1,36 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "bucketexecutor.h"
+
+namespace storage::spi {
+
+/**
+ * Simple Bucket task that wraps a lambda that does the job.
+ */
+template<class FunctionType>
+class LambdaBucketTask : public BucketTask {
+public:
+ explicit LambdaBucketTask(FunctionType &&func)
+ : _func(std::move(func))
+ {}
+
+ ~LambdaBucketTask() override = default;
+
+ void run(const Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete) override {
+ _func(bucket, std::move(onComplete));
+ }
+
+private:
+ FunctionType _func;
+};
+
+template<class FunctionType>
+std::unique_ptr<BucketTask>
+makeBucketTask(FunctionType &&function) {
+ return std::make_unique<LambdaBucketTask<std::decay_t<FunctionType>>>
+ (std::forward<FunctionType>(function));
+}
+
+}
diff --git a/persistence/src/vespa/persistence/spi/bucketexecutor.h b/persistence/src/vespa/persistence/spi/bucketexecutor.h
index 07c86fd5ffb..8237b78cca0 100644
--- a/persistence/src/vespa/persistence/spi/bucketexecutor.h
+++ b/persistence/src/vespa/persistence/spi/bucketexecutor.h
@@ -3,7 +3,8 @@
#pragma once
#include "bucket.h"
-#include "operationcomplete.h"
+
+namespace vespalib { class IDestructorCallback; }
namespace storage::spi {
@@ -17,7 +18,7 @@ namespace storage::spi {
class BucketTask {
public:
virtual ~BucketTask() = default;
- virtual void run(const Bucket & bucket, std::unique_ptr<OperationComplete> onComplete) = 0;
+ virtual void run(const Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete) = 0;
};
/**
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index 0d5d8ad3144..8bec6f9dd68 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -3,6 +3,7 @@
#include "persistenceengine.h"
#include "ipersistenceengineowner.h"
#include "transport_latch.h"
+#include <vespa/persistence/spi/bucketexecutor.h>
#include <vespa/vespalib/stllike/hash_set.h>
#include <vespa/document/fieldvalue/document.h>
#include <vespa/document/datatype/documenttype.h>
@@ -18,6 +19,7 @@ LOG_SETUP(".proton.persistenceengine.persistenceengine");
using document::Document;
using document::DocumentId;
using storage::spi::BucketChecksum;
+using storage::spi::BucketExecutor;
using storage::spi::BucketIdListResult;
using storage::spi::BucketInfo;
using storage::spi::BucketInfoResult;
@@ -737,4 +739,28 @@ PersistenceEngine::getWLock() const
return WriteGuard(_rwMutex);
}
+namespace {
+
+class SyncExecutorOnDestruction : public vespalib::IDestructorCallback {
+public:
+ explicit SyncExecutorOnDestruction(std::shared_ptr<BucketExecutor> executor) : _executor(std::move(executor)) { }
+ ~SyncExecutorOnDestruction() override {
+ if (_executor) {
+ _executor->sync();
+ }
+ }
+private:
+ std::shared_ptr<BucketExecutor> _executor;
+};
+
+}
+
+std::unique_ptr<vespalib::IDestructorCallback>
+PersistenceEngine::register_executor(std::shared_ptr<BucketExecutor> executor)
+{
+ assert(_bucket_executor.expired());
+ _bucket_executor = executor;
+ return std::make_unique<SyncExecutorOnDestruction>(executor);
+}
+
} // storage
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
index 659156fdea0..b5a99525575 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
@@ -39,6 +39,7 @@ private:
using TimestampList = storage::spi::TimestampList;
using UpdateResult = storage::spi::UpdateResult;
using OperationComplete = storage::spi::OperationComplete;
+ using BucketExecutor = storage::spi::BucketExecutor;
struct IteratorEntry {
PersistenceHandlerSequence handler_sequence;
@@ -73,6 +74,7 @@ private:
mutable ExtraModifiedBuckets _extraModifiedBuckets;
mutable std::shared_mutex _rwMutex;
std::shared_ptr<ResourceUsageTracker> _resource_usage_tracker;
+ std::weak_ptr<BucketExecutor> _bucket_executor;
using ReadGuard = std::shared_lock<std::shared_mutex>;
using WriteGuard = std::unique_lock<std::shared_mutex>;
@@ -116,12 +118,14 @@ public:
Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override;
Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override;
std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(IResourceUsageListener& listener) override;
+ std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<BucketExecutor>) override;
void destroyIterators();
void propagateSavedClusterState(BucketSpace bucketSpace, IPersistenceHandler &handler);
void grabExtraModifiedBuckets(BucketSpace bucketSpace, IPersistenceHandler &handler);
void populateInitialBucketDB(const WriteGuard & guard, BucketSpace bucketSpace, IPersistenceHandler &targetHandler);
WriteGuard getWLock() const;
ResourceUsageTracker &get_resource_usage_tracker() noexcept { return *_resource_usage_tracker; }
+ std::shared_ptr<BucketExecutor> get_bucket_executor() noexcept { return _bucket_executor.lock(); }
};
}
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index bbde377fdec..21a94a3e957 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -209,6 +209,12 @@ PersistenceProviderWrapper::register_resource_usage_listener(spi::IResourceUsage
return _spi.register_resource_usage_listener(listener);
}
+std::unique_ptr<vespalib::IDestructorCallback>
+PersistenceProviderWrapper::register_executor(std::shared_ptr<spi::BucketExecutor> executor)
+{
+ return _spi.register_executor(std::move(executor));
+}
+
spi::Result
PersistenceProviderWrapper::removeEntry(const spi::Bucket& bucket,
spi::Timestamp timestamp,
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index b07f9c5e0f5..085a60c0e86 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -110,6 +110,7 @@ public:
const spi::Bucket& target, spi::Context&) override;
spi::Result removeEntry(const spi::Bucket&, spi::Timestamp, spi::Context&) override;
std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(spi::IResourceUsageListener& listener) override;
+ std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor>) override;
};
} // storage
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 74ad5d7f2ce..acccbb8b9b9 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -14,6 +14,7 @@
#include <vespa/fastos/file.h>
#include <vespa/persistence/dummyimpl/dummypersistence.h>
#include <vespa/persistence/spi/test.h>
+#include <vespa/persistence/spi/bucket_tasks.h>
#include <vespa/storage/bucketdb/bucketmanager.h>
#include <vespa/storage/persistence/bucketownershipnotifier.h>
#include <vespa/storage/persistence/filestorage/filestorhandlerimpl.h>
@@ -24,6 +25,7 @@
#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/vdslib/state/random.h>
#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/gate.h>
#include <atomic>
#include <thread>
@@ -35,6 +37,7 @@ using document::Document;
using namespace storage::api;
using storage::spi::test::makeSpiBucket;
using document::test::makeDocumentBucket;
+using vespalib::IDestructorCallback;
using namespace ::testing;
#define ASSERT_SINGLE_REPLY(replytype, reply, link, time) \
@@ -409,6 +412,50 @@ TEST_F(FileStorManagerTest, put) {
}
}
+TEST_F(FileStorManagerTest, running_task_against_unknown_bucket_fails) {
+ TestFileStorComponents c(*this);
+
+ setClusterState("storage:3 distributor:3");
+ EXPECT_TRUE(getDummyPersistence().getClusterState().nodeUp());
+
+ auto executor = getDummyPersistence().get_bucket_executor();
+ ASSERT_TRUE(executor);
+
+ spi::Bucket b1 = makeSpiBucket(document::BucketId(1));
+ std::atomic<size_t> numInvocations(0);
+ auto response = executor->execute(b1, spi::makeBucketTask([&numInvocations](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) {
+ numInvocations++;
+ }));
+ ASSERT_TRUE(response);
+ EXPECT_EQ(0, numInvocations);
+ response->run(spi::Bucket(), {});
+ EXPECT_EQ(1, numInvocations);
+}
+
+TEST_F(FileStorManagerTest, running_task_against_existing_bucket_works) {
+ TestFileStorComponents c(*this);
+
+ setClusterState("storage:3 distributor:3");
+ EXPECT_TRUE(getDummyPersistence().getClusterState().nodeUp());
+
+ auto executor = getDummyPersistence().get_bucket_executor();
+ ASSERT_TRUE(executor);
+
+ spi::Bucket b1 = makeSpiBucket(document::BucketId(1));
+
+ createBucket(b1.getBucketId());
+
+ std::atomic<size_t> numInvocations(0);
+ vespalib::Gate gate;
+ auto response = executor->execute(b1, spi::makeBucketTask([&numInvocations, &gate](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) {
+ numInvocations++;
+ gate.countDown();
+ }));
+ EXPECT_FALSE(response);
+ gate.await();
+ EXPECT_EQ(1, numInvocations);
+}
+
TEST_F(FileStorManagerTest, state_change) {
TestFileStorComponents c(*this);
diff --git a/storage/src/vespa/storage/common/messagebucket.cpp b/storage/src/vespa/storage/common/messagebucket.cpp
index 61283fd3d04..3aa90138f36 100644
--- a/storage/src/vespa/storage/common/messagebucket.cpp
+++ b/storage/src/vespa/storage/common/messagebucket.cpp
@@ -66,6 +66,8 @@ getStorageMessageBucket(const api::StorageMessage& msg)
return static_cast<const ReadBucketInfo&>(msg).getBucket();
case RecheckBucketInfoCommand::ID:
return static_cast<const RecheckBucketInfoCommand&>(msg).getBucket();
+ case RunTaskCommand::ID:
+ return static_cast<const RunTaskCommand&>(msg).getBucket();
default:
break;
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 4b105f6688f..b8ed6b8ec91 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -6,6 +6,7 @@
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/document/update/documentupdate.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
+#include <vespa/vespalib/util/destructor_callbacks.h>
namespace storage {
@@ -96,6 +97,18 @@ AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider
{}
MessageTracker::UP
+AsyncHandler::handleRunTask(RunTaskCommand& cmd, MessageTracker::UP tracker) const {
+ auto task = makeResultTask([tracker = std::move(tracker)](spi::Result::UP response) {
+ tracker->checkForError(*response);
+ tracker->sendReply();
+ });
+ spi::Bucket bucket(cmd.getBucket());
+ auto onDone = std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task));
+ cmd.task().run(bucket, std::make_shared<vespalib::KeepAlive<decltype(onDone)>>(std::move(onDone)));
+ return tracker;
+}
+
+MessageTracker::UP
AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) const
{
MessageTracker & tracker = *trackerUP;
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index 92bf72e7c51..2d6b37a1cdd 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -2,6 +2,7 @@
#pragma once
#include "types.h"
+#include "messages.h"
#include <vespa/storageapi/message/persistence.h>
namespace document { class BucketIdFactory; }
@@ -25,6 +26,7 @@ public:
MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const;
static bool is_async_message(api::MessageType::Id type_id) noexcept;
private:
static bool tasConditionExists(const api::TestAndSetCommand & cmd);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 0f9c9894615..aadc58c9af6 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -972,7 +972,11 @@ void FileStorManager::initialize_bucket_databases_from_provider() {
std::unique_ptr<spi::BucketTask>
FileStorManager::execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) {
- (void) bucket;
+ StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get(
+ bucket.getBucketId(), "FileStorManager::execute"));
+ if (entry.exist()) {
+ _filestorHandler->schedule(std::make_shared<RunTaskCommand>(bucket, std::move(task)));
+ }
return task;
}
diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp
index 90597a49ad7..7ccb3ee895d 100644
--- a/storage/src/vespa/storage/persistence/messages.cpp
+++ b/storage/src/vespa/storage/persistence/messages.cpp
@@ -2,6 +2,7 @@
#include "messages.h"
#include <ostream>
+#include <cassert>
using document::BucketSpace;
@@ -177,4 +178,43 @@ AbortBucketOperationsCommand::makeReply() {
return std::make_unique<AbortBucketOperationsReply>(*this);
}
+std::unique_ptr<api::StorageReply>
+RunTaskCommand::makeReply() {
+ return std::make_unique<RunTaskReply>(*this);
+}
+
+RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task)
+ : api::InternalCommand(ID),
+ _task(std::move(task)),
+ _bucket(bucket)
+{
+ assert(_task);
+}
+
+RunTaskCommand::~RunTaskCommand() = default;
+
+void
+RunTaskCommand::print(std::ostream& out, bool verbose, const std::string& indent) const {
+ out << "RunTaskCommand(" << _bucket <<")";
+
+ if (verbose) {
+ out << " : ";
+ InternalCommand::print(out, true, indent);
+ }
+}
+
+RunTaskReply::RunTaskReply(const RunTaskCommand& cmd)
+ : api::InternalReply(ID, cmd)
+{}
+
+void
+RunTaskReply::print(std::ostream& out, bool verbose, const std::string& indent) const {
+ out << "RunTaskReply()";
+
+ if (verbose) {
+ out << " : ";
+ InternalReply::print(out, true, indent);
+ }
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h
index a465437ae21..043747d10d2 100644
--- a/storage/src/vespa/storage/persistence/messages.h
+++ b/storage/src/vespa/storage/persistence/messages.h
@@ -6,6 +6,7 @@
#include <vespa/persistence/spi/bucket.h>
#include <vespa/persistence/spi/selection.h>
#include <vespa/persistence/spi/read_consistency.h>
+#include <vespa/persistence/spi/bucketexecutor.h>
namespace storage {
@@ -16,7 +17,7 @@ private:
uint32_t _maxByteSize;
public:
- static const uint32_t ID = 1001;
+ static constexpr uint32_t ID = 1001;
typedef std::unique_ptr<GetIterCommand> UP;
typedef std::shared_ptr<GetIterCommand> SP;
@@ -50,7 +51,7 @@ private:
public:
typedef std::unique_ptr<GetIterReply> UP;
typedef std::shared_ptr<GetIterReply> SP;
- static const uint32_t ID = 1002;
+ static constexpr uint32_t ID = 1002;
explicit GetIterReply(GetIterCommand& cmd);
~GetIterReply() override;
@@ -80,7 +81,7 @@ class CreateIteratorCommand : public api::InternalCommand
spi::ReadConsistency _readConsistency;
public:
- static const uint32_t ID = 1003;
+ static constexpr uint32_t ID = 1003;
typedef std::unique_ptr<CreateIteratorCommand> UP;
typedef std::shared_ptr<CreateIteratorCommand> SP;
@@ -114,7 +115,7 @@ class CreateIteratorReply : public api::InternalReply
document::Bucket _bucket;
spi::IteratorId _iteratorId;
public:
- static const uint32_t ID = 1004;
+ static constexpr uint32_t ID = 1004;
typedef std::unique_ptr<CreateIteratorReply> UP;
typedef std::shared_ptr<CreateIteratorReply> SP;
@@ -132,7 +133,7 @@ class DestroyIteratorCommand : public api::InternalCommand
{
spi::IteratorId _iteratorId;
public:
- static const uint32_t ID = 1005;
+ static constexpr uint32_t ID = 1005;
typedef std::unique_ptr<DestroyIteratorCommand> UP;
typedef std::shared_ptr<DestroyIteratorCommand> SP;
@@ -150,7 +151,7 @@ class DestroyIteratorReply : public api::InternalReply
{
spi::IteratorId _iteratorId;
public:
- static const uint32_t ID = 1006;
+ static constexpr uint32_t ID = 1006;
typedef std::unique_ptr<DestroyIteratorReply> UP;
typedef std::shared_ptr<DestroyIteratorReply> SP;
@@ -164,7 +165,7 @@ class RecheckBucketInfoCommand : public api::InternalCommand
{
document::Bucket _bucket;
public:
- static const uint32_t ID = 1007;
+ static constexpr uint32_t ID = 1007;
typedef std::shared_ptr<RecheckBucketInfoCommand> SP;
typedef std::unique_ptr<RecheckBucketInfoCommand> UP;
@@ -182,7 +183,7 @@ class RecheckBucketInfoReply : public api::InternalReply
{
document::Bucket _bucket;
public:
- static const uint32_t ID = 1008;
+ static constexpr uint32_t ID = 1008;
typedef std::shared_ptr<RecheckBucketInfoReply> SP;
typedef std::unique_ptr<RecheckBucketInfoReply> UP;
@@ -206,7 +207,7 @@ public:
}
};
- static const uint32_t ID = 1009;
+ static constexpr uint32_t ID = 1009;
typedef std::shared_ptr<AbortBucketOperationsCommand> SP;
typedef std::shared_ptr<const AbortBucketOperationsCommand> CSP;
private:
@@ -227,7 +228,7 @@ public:
class AbortBucketOperationsReply : public api::InternalReply
{
public:
- static const uint32_t ID = 1010;
+ static constexpr uint32_t ID = 1010;
typedef std::shared_ptr<AbortBucketOperationsReply> SP;
typedef std::shared_ptr<const AbortBucketOperationsReply> CSP;
@@ -237,5 +238,36 @@ public:
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
};
+
+// Internal Command task for bringing along a Bucket and a BucketTask in
+// the inner workings of the storagelink chain.
+class RunTaskCommand : public api::InternalCommand {
+public:
+ static constexpr uint32_t ID = 1011;
+ RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task);
+ ~RunTaskCommand();
+
+ document::Bucket getBucket() const override { return _bucket.getBucket(); }
+ std::unique_ptr<api::StorageReply> makeReply() override;
+ spi::BucketTask & task() & {
+ return *_task;
+ }
+
+ void print(std::ostream& out, bool verbose, const std::string& indent) const override;
+private:
+ std::unique_ptr<spi::BucketTask> _task;
+ spi::Bucket _bucket;
+};
+
+// Simple reply for matching the RunTaskCommand
+class RunTaskReply : public api::InternalReply
+{
+public:
+ explicit RunTaskReply(const RunTaskCommand&);
+ void print(std::ostream& out, bool verbose, const std::string& indent) const override;
+private:
+ static constexpr uint32_t ID = 1012;
+};
+
} // ns storage
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index cbe5454f4e7..38ffd2c57e7 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -75,6 +75,8 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
return _simpleHandler.handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg), std::move(tracker));
case RecheckBucketInfoCommand::ID:
return _splitJoinHandler.handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg), std::move(tracker));
+ case RunTaskCommand::ID:
+ return _asyncHandler.handleRunTask(static_cast<RunTaskCommand &>(msg), std::move(tracker));
default:
LOG(warning, "Persistence handler received unhandled internal command %s", msg.toString().c_str());
break;