summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-01-20 13:02:35 +0100
committerGitHub <noreply@github.com>2021-01-20 13:02:35 +0100
commit669dcc1c35f21672fb63363a5a7f2f7989f20cef (patch)
treee2dd3036aeda242958c21cd137575cbabfbf1312 /storage
parent6f649d5dd2234c7dd581f9ac206663cbfa6b8ae8 (diff)
parentff551559deacaacc3bb77699686bb6c65e08a818 (diff)
Merge pull request #16098 from vespa-engine/balder/implement-bucketexecutor-for-filestormanager
Implement bucketexecutor interface and.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp6
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h1
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp47
-rw-r--r--storage/src/vespa/storage/common/messagebucket.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp13
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/messages.cpp40
-rw-r--r--storage/src/vespa/storage/persistence/messages.h52
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp2
10 files changed, 160 insertions, 11 deletions
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index bbde377fdec..21a94a3e957 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -209,6 +209,12 @@ PersistenceProviderWrapper::register_resource_usage_listener(spi::IResourceUsage
return _spi.register_resource_usage_listener(listener);
}
+std::unique_ptr<vespalib::IDestructorCallback>
+PersistenceProviderWrapper::register_executor(std::shared_ptr<spi::BucketExecutor> executor)
+{
+ return _spi.register_executor(std::move(executor));
+}
+
spi::Result
PersistenceProviderWrapper::removeEntry(const spi::Bucket& bucket,
spi::Timestamp timestamp,
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index b07f9c5e0f5..085a60c0e86 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -110,6 +110,7 @@ public:
const spi::Bucket& target, spi::Context&) override;
spi::Result removeEntry(const spi::Bucket&, spi::Timestamp, spi::Context&) override;
std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(spi::IResourceUsageListener& listener) override;
+ std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor>) override;
};
} // storage
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 74ad5d7f2ce..acccbb8b9b9 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -14,6 +14,7 @@
#include <vespa/fastos/file.h>
#include <vespa/persistence/dummyimpl/dummypersistence.h>
#include <vespa/persistence/spi/test.h>
+#include <vespa/persistence/spi/bucket_tasks.h>
#include <vespa/storage/bucketdb/bucketmanager.h>
#include <vespa/storage/persistence/bucketownershipnotifier.h>
#include <vespa/storage/persistence/filestorage/filestorhandlerimpl.h>
@@ -24,6 +25,7 @@
#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/vdslib/state/random.h>
#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/gate.h>
#include <atomic>
#include <thread>
@@ -35,6 +37,7 @@ using document::Document;
using namespace storage::api;
using storage::spi::test::makeSpiBucket;
using document::test::makeDocumentBucket;
+using vespalib::IDestructorCallback;
using namespace ::testing;
#define ASSERT_SINGLE_REPLY(replytype, reply, link, time) \
@@ -409,6 +412,50 @@ TEST_F(FileStorManagerTest, put) {
}
}
+TEST_F(FileStorManagerTest, running_task_against_unknown_bucket_fails) {
+ TestFileStorComponents c(*this);
+
+ setClusterState("storage:3 distributor:3");
+ EXPECT_TRUE(getDummyPersistence().getClusterState().nodeUp());
+
+ auto executor = getDummyPersistence().get_bucket_executor();
+ ASSERT_TRUE(executor);
+
+ spi::Bucket b1 = makeSpiBucket(document::BucketId(1));
+ std::atomic<size_t> numInvocations(0);
+ auto response = executor->execute(b1, spi::makeBucketTask([&numInvocations](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) {
+ numInvocations++;
+ }));
+ ASSERT_TRUE(response);
+ EXPECT_EQ(0, numInvocations);
+ response->run(spi::Bucket(), {});
+ EXPECT_EQ(1, numInvocations);
+}
+
+TEST_F(FileStorManagerTest, running_task_against_existing_bucket_works) {
+ TestFileStorComponents c(*this);
+
+ setClusterState("storage:3 distributor:3");
+ EXPECT_TRUE(getDummyPersistence().getClusterState().nodeUp());
+
+ auto executor = getDummyPersistence().get_bucket_executor();
+ ASSERT_TRUE(executor);
+
+ spi::Bucket b1 = makeSpiBucket(document::BucketId(1));
+
+ createBucket(b1.getBucketId());
+
+ std::atomic<size_t> numInvocations(0);
+ vespalib::Gate gate;
+ auto response = executor->execute(b1, spi::makeBucketTask([&numInvocations, &gate](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) {
+ numInvocations++;
+ gate.countDown();
+ }));
+ EXPECT_FALSE(response);
+ gate.await();
+ EXPECT_EQ(1, numInvocations);
+}
+
TEST_F(FileStorManagerTest, state_change) {
TestFileStorComponents c(*this);
diff --git a/storage/src/vespa/storage/common/messagebucket.cpp b/storage/src/vespa/storage/common/messagebucket.cpp
index 61283fd3d04..3aa90138f36 100644
--- a/storage/src/vespa/storage/common/messagebucket.cpp
+++ b/storage/src/vespa/storage/common/messagebucket.cpp
@@ -66,6 +66,8 @@ getStorageMessageBucket(const api::StorageMessage& msg)
return static_cast<const ReadBucketInfo&>(msg).getBucket();
case RecheckBucketInfoCommand::ID:
return static_cast<const RecheckBucketInfoCommand&>(msg).getBucket();
+ case RunTaskCommand::ID:
+ return static_cast<const RunTaskCommand&>(msg).getBucket();
default:
break;
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 4b105f6688f..b8ed6b8ec91 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -6,6 +6,7 @@
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/document/update/documentupdate.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
+#include <vespa/vespalib/util/destructor_callbacks.h>
namespace storage {
@@ -96,6 +97,18 @@ AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider
{}
MessageTracker::UP
+AsyncHandler::handleRunTask(RunTaskCommand& cmd, MessageTracker::UP tracker) const {
+ auto task = makeResultTask([tracker = std::move(tracker)](spi::Result::UP response) {
+ tracker->checkForError(*response);
+ tracker->sendReply();
+ });
+ spi::Bucket bucket(cmd.getBucket());
+ auto onDone = std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task));
+ cmd.task().run(bucket, std::make_shared<vespalib::KeepAlive<decltype(onDone)>>(std::move(onDone)));
+ return tracker;
+}
+
+MessageTracker::UP
AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) const
{
MessageTracker & tracker = *trackerUP;
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index 92bf72e7c51..2d6b37a1cdd 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -2,6 +2,7 @@
#pragma once
#include "types.h"
+#include "messages.h"
#include <vespa/storageapi/message/persistence.h>
namespace document { class BucketIdFactory; }
@@ -25,6 +26,7 @@ public:
MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const;
static bool is_async_message(api::MessageType::Id type_id) noexcept;
private:
static bool tasConditionExists(const api::TestAndSetCommand & cmd);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 0f9c9894615..aadc58c9af6 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -972,7 +972,11 @@ void FileStorManager::initialize_bucket_databases_from_provider() {
std::unique_ptr<spi::BucketTask>
FileStorManager::execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) {
- (void) bucket;
+ StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get(
+ bucket.getBucketId(), "FileStorManager::execute"));
+ if (entry.exist()) {
+ _filestorHandler->schedule(std::make_shared<RunTaskCommand>(bucket, std::move(task)));
+ }
return task;
}
diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp
index 90597a49ad7..7ccb3ee895d 100644
--- a/storage/src/vespa/storage/persistence/messages.cpp
+++ b/storage/src/vespa/storage/persistence/messages.cpp
@@ -2,6 +2,7 @@
#include "messages.h"
#include <ostream>
+#include <cassert>
using document::BucketSpace;
@@ -177,4 +178,43 @@ AbortBucketOperationsCommand::makeReply() {
return std::make_unique<AbortBucketOperationsReply>(*this);
}
+std::unique_ptr<api::StorageReply>
+RunTaskCommand::makeReply() {
+ return std::make_unique<RunTaskReply>(*this);
+}
+
+RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task)
+ : api::InternalCommand(ID),
+ _task(std::move(task)),
+ _bucket(bucket)
+{
+ assert(_task);
+}
+
+RunTaskCommand::~RunTaskCommand() = default;
+
+void
+RunTaskCommand::print(std::ostream& out, bool verbose, const std::string& indent) const {
+ out << "RunTaskCommand(" << _bucket <<")";
+
+ if (verbose) {
+ out << " : ";
+ InternalCommand::print(out, true, indent);
+ }
+}
+
+RunTaskReply::RunTaskReply(const RunTaskCommand& cmd)
+ : api::InternalReply(ID, cmd)
+{}
+
+void
+RunTaskReply::print(std::ostream& out, bool verbose, const std::string& indent) const {
+ out << "RunTaskReply()";
+
+ if (verbose) {
+ out << " : ";
+ InternalReply::print(out, true, indent);
+ }
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h
index a465437ae21..043747d10d2 100644
--- a/storage/src/vespa/storage/persistence/messages.h
+++ b/storage/src/vespa/storage/persistence/messages.h
@@ -6,6 +6,7 @@
#include <vespa/persistence/spi/bucket.h>
#include <vespa/persistence/spi/selection.h>
#include <vespa/persistence/spi/read_consistency.h>
+#include <vespa/persistence/spi/bucketexecutor.h>
namespace storage {
@@ -16,7 +17,7 @@ private:
uint32_t _maxByteSize;
public:
- static const uint32_t ID = 1001;
+ static constexpr uint32_t ID = 1001;
typedef std::unique_ptr<GetIterCommand> UP;
typedef std::shared_ptr<GetIterCommand> SP;
@@ -50,7 +51,7 @@ private:
public:
typedef std::unique_ptr<GetIterReply> UP;
typedef std::shared_ptr<GetIterReply> SP;
- static const uint32_t ID = 1002;
+ static constexpr uint32_t ID = 1002;
explicit GetIterReply(GetIterCommand& cmd);
~GetIterReply() override;
@@ -80,7 +81,7 @@ class CreateIteratorCommand : public api::InternalCommand
spi::ReadConsistency _readConsistency;
public:
- static const uint32_t ID = 1003;
+ static constexpr uint32_t ID = 1003;
typedef std::unique_ptr<CreateIteratorCommand> UP;
typedef std::shared_ptr<CreateIteratorCommand> SP;
@@ -114,7 +115,7 @@ class CreateIteratorReply : public api::InternalReply
document::Bucket _bucket;
spi::IteratorId _iteratorId;
public:
- static const uint32_t ID = 1004;
+ static constexpr uint32_t ID = 1004;
typedef std::unique_ptr<CreateIteratorReply> UP;
typedef std::shared_ptr<CreateIteratorReply> SP;
@@ -132,7 +133,7 @@ class DestroyIteratorCommand : public api::InternalCommand
{
spi::IteratorId _iteratorId;
public:
- static const uint32_t ID = 1005;
+ static constexpr uint32_t ID = 1005;
typedef std::unique_ptr<DestroyIteratorCommand> UP;
typedef std::shared_ptr<DestroyIteratorCommand> SP;
@@ -150,7 +151,7 @@ class DestroyIteratorReply : public api::InternalReply
{
spi::IteratorId _iteratorId;
public:
- static const uint32_t ID = 1006;
+ static constexpr uint32_t ID = 1006;
typedef std::unique_ptr<DestroyIteratorReply> UP;
typedef std::shared_ptr<DestroyIteratorReply> SP;
@@ -164,7 +165,7 @@ class RecheckBucketInfoCommand : public api::InternalCommand
{
document::Bucket _bucket;
public:
- static const uint32_t ID = 1007;
+ static constexpr uint32_t ID = 1007;
typedef std::shared_ptr<RecheckBucketInfoCommand> SP;
typedef std::unique_ptr<RecheckBucketInfoCommand> UP;
@@ -182,7 +183,7 @@ class RecheckBucketInfoReply : public api::InternalReply
{
document::Bucket _bucket;
public:
- static const uint32_t ID = 1008;
+ static constexpr uint32_t ID = 1008;
typedef std::shared_ptr<RecheckBucketInfoReply> SP;
typedef std::unique_ptr<RecheckBucketInfoReply> UP;
@@ -206,7 +207,7 @@ public:
}
};
- static const uint32_t ID = 1009;
+ static constexpr uint32_t ID = 1009;
typedef std::shared_ptr<AbortBucketOperationsCommand> SP;
typedef std::shared_ptr<const AbortBucketOperationsCommand> CSP;
private:
@@ -227,7 +228,7 @@ public:
class AbortBucketOperationsReply : public api::InternalReply
{
public:
- static const uint32_t ID = 1010;
+ static constexpr uint32_t ID = 1010;
typedef std::shared_ptr<AbortBucketOperationsReply> SP;
typedef std::shared_ptr<const AbortBucketOperationsReply> CSP;
@@ -237,5 +238,36 @@ public:
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
};
+
+// Internal Command task for bringing along a Bucket and a BucketTask in
+// the inner workings of the storagelink chain.
+class RunTaskCommand : public api::InternalCommand {
+public:
+ static constexpr uint32_t ID = 1011;
+ RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task);
+ ~RunTaskCommand();
+
+ document::Bucket getBucket() const override { return _bucket.getBucket(); }
+ std::unique_ptr<api::StorageReply> makeReply() override;
+ spi::BucketTask & task() & {
+ return *_task;
+ }
+
+ void print(std::ostream& out, bool verbose, const std::string& indent) const override;
+private:
+ std::unique_ptr<spi::BucketTask> _task;
+ spi::Bucket _bucket;
+};
+
+// Simple reply for matching the RunTaskCommand
+class RunTaskReply : public api::InternalReply
+{
+public:
+ explicit RunTaskReply(const RunTaskCommand&);
+ void print(std::ostream& out, bool verbose, const std::string& indent) const override;
+private:
+ static constexpr uint32_t ID = 1012;
+};
+
} // ns storage
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index cbe5454f4e7..38ffd2c57e7 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -75,6 +75,8 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
return _simpleHandler.handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg), std::move(tracker));
case RecheckBucketInfoCommand::ID:
return _splitJoinHandler.handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg), std::move(tracker));
+ case RunTaskCommand::ID:
+ return _asyncHandler.handleRunTask(static_cast<RunTaskCommand &>(msg), std::move(tracker));
default:
LOG(warning, "Persistence handler received unhandled internal command %s", msg.toString().c_str());
break;