summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-18 15:50:16 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-10-18 15:50:16 +0000
commite6a8e2c087d25335784c05a8d01e1c1b665d3b3d (patch)
treec25e899bf5e052b805424e4caf943ee882ef198b /storage
parente099df397486313831f09befa1d081b178384632 (diff)
Factor out handling of the remaining messages.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/testandsettest.cpp5
-rw-r--r--storage/src/vespa/storage/persistence/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/persistence/messages.h1
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp235
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h14
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h4
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.cpp241
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.h33
9 files changed, 293 insertions, 243 deletions
diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp
index e4c5227f951..3d0dd183232 100644
--- a/storage/src/tests/persistence/testandsettest.cpp
+++ b/storage/src/tests/persistence/testandsettest.cpp
@@ -251,10 +251,11 @@ TestAndSetTest::createTestDocument()
return doc;
}
-document::Document::SP TestAndSetTest::retrieveTestDocument()
+document::Document::SP
+TestAndSetTest::retrieveTestDocument()
{
auto get = std::make_shared<api::GetCommand>(BUCKET, testDocId, document::AllFields::NAME);
- auto tracker = thread->handleGet(*get, createTracker(get, BUCKET));
+ auto tracker = thread->simpleMessageHandler().handleGet(*get, createTracker(get, BUCKET));
assert(tracker->getResult() == api::ReturnCode::Result::OK);
auto & reply = static_cast<api::GetReply &>(tracker->getReply());
diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt
index 5707c9ea687..aa22a67f747 100644
--- a/storage/src/vespa/storage/persistence/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/CMakeLists.txt
@@ -11,6 +11,7 @@ vespa_add_library(storage_spersistence OBJECT
persistenceutil.cpp
processallhandler.cpp
provider_error_wrapper.cpp
+ simplemessagehandler.cpp
splitbitdetector.cpp
splitjoinhandler.cpp
testandsethelper.cpp
diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h
index b771af18b17..d4888970732 100644
--- a/storage/src/vespa/storage/persistence/messages.h
+++ b/storage/src/vespa/storage/persistence/messages.h
@@ -6,7 +6,6 @@
#include <vespa/persistence/spi/bucket.h>
#include <vespa/persistence/spi/selection.h>
#include <vespa/persistence/spi/read_consistency.h>
-#include <vespa/vespalib/stllike/hash_set.h>
namespace storage {
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 74d5c304bfb..719c2130e1a 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -46,6 +46,7 @@ PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor & sequence
cfg.commonMergeChainOptimalizationMinimumSize),
_asyncHandler(_env, _spi, sequencedExecutor),
_splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization),
+ _simpleHandler(_env, provider),
_thread()
{
_thread = _component->startThread(*this, 60s, 1s);
@@ -60,230 +61,12 @@ PersistenceThread::~PersistenceThread()
LOG(debug, "Persistence thread done with destruction");
}
-namespace {
-
-spi::ReadConsistency api_read_consistency_to_spi(api::InternalReadConsistency consistency) noexcept {
- switch (consistency) {
- case api::InternalReadConsistency::Strong: return spi::ReadConsistency::STRONG;
- case api::InternalReadConsistency::Weak: return spi::ReadConsistency::WEAK;
- default: abort();
- }
-}
-
-document::FieldSet::SP
-getFieldSet(const document::FieldSetRepo & repo, vespalib::stringref name, MessageTracker & tracker) {
- try {
- return repo.getFieldSet(name);
- } catch (document::FieldNotFoundException & e) {
- tracker.fail(storage::api::ReturnCode::ILLEGAL_PARAMETERS,
- fmt("Field %s in fieldset %s not found in document", e.getFieldName().c_str(), to_str(name).c_str()));
- } catch (const vespalib::Exception & e) {
- tracker.fail(storage::api::ReturnCode::ILLEGAL_PARAMETERS,
- fmt("Failed parsing fieldset %s with : %s", to_str(name).c_str(), e.getMessage().c_str()));
- }
- return document::FieldSet::SP();
-}
-
-}
-
-MessageTracker::UP
-PersistenceThread::handleGet(api::GetCommand& cmd, MessageTracker::UP tracker)
-{
- auto& metrics = _env._metrics.get[cmd.getLoadType()];
- tracker->setMetric(metrics);
- metrics.request_size.addValue(cmd.getApproxByteSize());
-
- auto fieldSet = getFieldSet(*_env._component.getTypeRepo()->fieldSetRepo, cmd.getFieldSet(), *tracker);
- if ( ! fieldSet) { return tracker; }
-
- tracker->context().setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency()));
- spi::GetResult result = _spi.get(_env.getBucket(cmd.getDocumentId(), cmd.getBucket()),
- *fieldSet, cmd.getDocumentId(), tracker->context());
-
- if (tracker->checkForError(result)) {
- if (!result.hasDocument() && (document::FieldSet::Type::NONE != fieldSet->getType())) {
- metrics.notFound.inc();
- }
- tracker->setReply(std::make_shared<api::GetReply>(cmd, result.getDocumentPtr(), result.getTimestamp(),
- false, result.is_tombstone()));
- }
-
- return tracker;
-}
-
-MessageTracker::UP
-PersistenceThread::handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker)
-{
- tracker->setMetric(_env._metrics.revert[cmd.getLoadType()]);
- spi::Bucket b = spi::Bucket(cmd.getBucket());
- const std::vector<api::Timestamp> & tokens = cmd.getRevertTokens();
- for (const api::Timestamp & token : tokens) {
- spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), tracker->context());
- }
- return tracker;
-}
-
-MessageTracker::UP
-PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker)
-{
- tracker->setMetric(_env._metrics.createBuckets);
- LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
- if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
- LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str());
- DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId());
- }
- spi::Bucket spiBucket(cmd.getBucket());
- _spi.createBucket(spiBucket, tracker->context());
- if (cmd.getActive()) {
- _spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE);
- }
- return tracker;
-}
-
-namespace {
-
-bool bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) {
- // Don't check document sizes, as background moving of documents in Proton
- // may trigger a change in size without any mutations taking place. This will
- // only take place when a document being moved was fed _prior_ to the change
- // where Proton starts reporting actual document sizes, and will eventually
- // converge to a stable value. But for now, ignore it to prevent false positive
- // error logs and non-deleted buckets.
- return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount()));
-}
-
-}
-
-bool
-PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const
-{
- spi::BucketInfoResult result(_spi.getBucketInfo(bucket));
- if (result.hasError()) {
- LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'",
- bucket.toString().c_str(), result.getErrorMessage().c_str());
- return false;
- }
- api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo()));
- // Don't check meta fields or active/ready fields since these are not
- // that important and ready may change under the hood in a race with
- // getModifiedBuckets(). If bucket is empty it means it has already
- // been deleted by a racing split/join.
- if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) {
- LOG(error,
- "Service layer bucket database and provider out of sync before "
- "deleting bucket %s! Service layer db had %s while provider says "
- "bucket has %s. Deletion has been rejected to ensure data is not "
- "lost, but bucket may remain out of sync until service has been "
- "restarted.",
- bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str());
- return false;
- }
- return true;
-}
-
-MessageTracker::UP
-PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker)
-{
- tracker->setMetric(_env._metrics.deleteBuckets);
- LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str());
- LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()");
- if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
- _env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
- api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge"));
- }
- spi::Bucket bucket(cmd.getBucket());
- if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) {
- return tracker;
- }
- _spi.deleteBucket(bucket, tracker->context());
- StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace()));
- {
- StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket"));
- if (entry.exist() && entry->getMetaCount() > 0) {
- LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
- "active operation when delete bucket was queued. "
- "Updating bucket database to keep it in sync with file. "
- "Cannot delete bucket from bucket database at this "
- "point, as it can have been intentionally recreated "
- "after delete bucket had been sent",
- cmd.getBucketId().toString().c_str());
- api::BucketInfo info(0, 0, 0);
- // Only set document counts/size; retain ready/active state.
- info.setReady(entry->getBucketInfo().isReady());
- info.setActive(entry->getBucketInfo().isActive());
-
- entry->setBucketInfo(info);
- entry.write();
- }
- }
- return tracker;
-}
-
-MessageTracker::UP
-PersistenceThread::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker)
-{
- tracker->setMetric(_env._metrics.visit[cmd.getLoadType()]);
- spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), tracker->context()));
- if (tracker->checkForError(result)) {
- auto reply = std::make_shared<GetIterReply>(cmd);
- reply->getEntries() = result.steal_entries();
- _env._metrics.visit[cmd.getLoadType()].
- documentsPerIterate.addValue(reply->getEntries().size());
- if (result.isCompleted()) {
- reply->setCompleted();
- }
- tracker->setReply(reply);
- }
- return tracker;
-}
-
-MessageTracker::UP
-PersistenceThread::handleReadBucketList(ReadBucketList& cmd, MessageTracker::UP tracker)
-{
- tracker->setMetric(_env._metrics.readBucketList);
-
- assert(cmd.getPartition() == 0u);
- spi::BucketIdListResult result(_spi.listBuckets(cmd.getBucketSpace()));
- if (tracker->checkForError(result)) {
- auto reply = std::make_shared<ReadBucketListReply>(cmd);
- result.getList().swap(reply->getBuckets());
- tracker->setReply(reply);
- }
-
- return tracker;
-}
-
-MessageTracker::UP
-PersistenceThread::handleReadBucketInfo(ReadBucketInfo& cmd, MessageTracker::UP tracker)
-{
- tracker->setMetric(_env._metrics.readBucketInfo);
- _env.updateBucketDatabase(cmd.getBucket(), _env.getBucketInfo(cmd.getBucket()));
- return tracker;
-}
-
-MessageTracker::UP
-PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, MessageTracker::UP tracker)
-{
- tracker->setMetric(_env._metrics.createIterator);
- auto fieldSet = getFieldSet(*_env._component.getTypeRepo()->fieldSetRepo, cmd.getFields(), *tracker);
- if ( ! fieldSet) { return tracker; }
-
- tracker->context().setReadConsistency(cmd.getReadConsistency());
- spi::CreateIteratorResult result(_spi.createIterator(
- spi::Bucket(cmd.getBucket()),
- std::move(fieldSet), cmd.getSelection(), cmd.getIncludedVersions(), tracker->context()));
- if (tracker->checkForError(result)) {
- tracker->setReply(std::make_shared<CreateIteratorReply>(cmd, spi::IteratorId(result.getIteratorId())));
- }
- return tracker;
-}
-
MessageTracker::UP
PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTracker::UP tracker)
{
switch (msg.getType().getId()) {
case api::MessageType::GET_ID:
- return handleGet(static_cast<api::GetCommand&>(msg), std::move(tracker));
+ return _simpleHandler.handleGet(static_cast<api::GetCommand&>(msg), std::move(tracker));
case api::MessageType::PUT_ID:
return _asyncHandler.handlePut(static_cast<api::PutCommand&>(msg), std::move(tracker));
case api::MessageType::REMOVE_ID:
@@ -291,11 +74,11 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTra
case api::MessageType::UPDATE_ID:
return _asyncHandler.handleUpdate(static_cast<api::UpdateCommand&>(msg), std::move(tracker));
case api::MessageType::REVERT_ID:
- return handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker));
+ return _simpleHandler.handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker));
case api::MessageType::CREATEBUCKET_ID:
- return handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
+ return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
case api::MessageType::DELETEBUCKET_ID:
- return handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
+ return _simpleHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
case api::MessageType::JOINBUCKETS_ID:
return _splitJoinHandler.handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker));
case api::MessageType::SPLITBUCKET_ID:
@@ -316,13 +99,13 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTra
case api::MessageType::INTERNAL_ID:
switch(static_cast<api::InternalCommand&>(msg).getType()) {
case GetIterCommand::ID:
- return handleGetIter(static_cast<GetIterCommand&>(msg), std::move(tracker));
+ return _simpleHandler.handleGetIter(static_cast<GetIterCommand&>(msg), std::move(tracker));
case CreateIteratorCommand::ID:
- return handleCreateIterator(static_cast<CreateIteratorCommand&>(msg), std::move(tracker));
+ return _simpleHandler.handleCreateIterator(static_cast<CreateIteratorCommand&>(msg), std::move(tracker));
case ReadBucketList::ID:
- return handleReadBucketList(static_cast<ReadBucketList&>(msg), std::move(tracker));
+ return _simpleHandler.handleReadBucketList(static_cast<ReadBucketList&>(msg), std::move(tracker));
case ReadBucketInfo::ID:
- return handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg), std::move(tracker));
+ return _simpleHandler.handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg), std::move(tracker));
case InternalBucketJoinCommand::ID:
return _splitJoinHandler.handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), std::move(tracker));
case RecheckBucketInfoCommand::ID:
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index 48223afd405..917812868a7 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -9,6 +9,7 @@
#include "persistenceutil.h"
#include "provider_error_wrapper.h"
#include "splitjoinhandler.h"
+#include "simplemessagehandler.h"
#include <vespa/storage/common/bucketmessages.h>
#include <vespa/storage/common/storagecomponent.h>
#include <vespa/storage/common/statusmessages.h>
@@ -31,18 +32,10 @@ public:
void flush() override;
framework::Thread& getThread() override { return *_thread; }
- MessageTracker::UP handleGet(api::GetCommand& cmd, MessageTracker::UP tracker);
- MessageTracker::UP handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker);
- MessageTracker::UP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker);
- MessageTracker::UP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker);
- MessageTracker::UP handleCreateIterator(CreateIteratorCommand& cmd, MessageTracker::UP tracker);
- MessageTracker::UP handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker);
- MessageTracker::UP handleReadBucketList(ReadBucketList& cmd, MessageTracker::UP tracker);
- MessageTracker::UP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTracker::UP tracker);
-
//TODO Rewrite tests to avoid this api leak
const AsyncHandler & asyncHandler() const { return _asyncHandler; }
const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; }
+ const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; }
private:
uint32_t _stripeId;
ServiceLayerComponent::UP _component;
@@ -52,10 +45,9 @@ private:
MergeHandler _mergeHandler;
AsyncHandler _asyncHandler;
SplitJoinHandler _splitJoinHandler;
+ SimpleMessageHandler _simpleHandler;
framework::Thread::UP _thread;
- bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
-
// Message handling functions
MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker);
void handleReply(api::StorageReply&);
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index ba074003b80..35842297ef9 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -171,7 +171,7 @@ PersistenceUtil::PersistenceUtil(
PersistenceUtil::~PersistenceUtil() = default;
void
-PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& i)
+PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& i) const
{
// Update bucket database
StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(),
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index e27419ccc1f..c90eee4b7ae 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -108,11 +108,11 @@ struct PersistenceUtil {
~PersistenceUtil();
- StorBucketDatabase& getBucketDatabase(document::BucketSpace bucketSpace) {
+ StorBucketDatabase& getBucketDatabase(document::BucketSpace bucketSpace) const {
return _component.getBucketDatabase(bucketSpace);
}
- void updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& info);
+ void updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& info) const;
uint16_t getPreferredAvailableDisk(const document::Bucket &bucket) const;
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
new file mode 100644
index 00000000000..209c3846b97
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
@@ -0,0 +1,241 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "simplemessagehandler.h"
+#include "persistenceutil.h"
+#include <vespa/persistence/spi/persistenceprovider.h>
+#include <vespa/storage/common/bucketoperationlogger.h>
+#include <vespa/document/base/exceptions.h>
+#include <vespa/document/fieldset/fieldsetrepo.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".persistence.processall");
+
+using vespalib::make_string_short::fmt;
+using to_str = vespalib::string;
+
+namespace storage {
+
+namespace {
+
+spi::ReadConsistency
+api_read_consistency_to_spi(api::InternalReadConsistency consistency) noexcept {
+ switch (consistency) {
+ case api::InternalReadConsistency::Strong:
+ return spi::ReadConsistency::STRONG;
+ case api::InternalReadConsistency::Weak:
+ return spi::ReadConsistency::WEAK;
+ default:
+ abort();
+ }
+}
+
+document::FieldSet::SP
+getFieldSet(const document::FieldSetRepo & repo, vespalib::stringref name, MessageTracker & tracker) {
+ try {
+ return repo.getFieldSet(name);
+ } catch (document::FieldNotFoundException & e) {
+ tracker.fail(storage::api::ReturnCode::ILLEGAL_PARAMETERS,
+ fmt("Field %s in fieldset %s not found in document", e.getFieldName().c_str(), to_str(name).c_str()));
+ } catch (const vespalib::Exception & e) {
+ tracker.fail(storage::api::ReturnCode::ILLEGAL_PARAMETERS,
+ fmt("Failed parsing fieldset %s with : %s", to_str(name).c_str(), e.getMessage().c_str()));
+ }
+ return document::FieldSet::SP();
+}
+
+bool
+bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) {
+ // Don't check document sizes, as background moving of documents in Proton
+ // may trigger a change in size without any mutations taking place. This will
+ // only take place when a document being moved was fed _prior_ to the change
+ // where Proton starts reporting actual document sizes, and will eventually
+ // converge to a stable value. But for now, ignore it to prevent false positive
+ // error logs and non-deleted buckets.
+ return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount()));
+}
+}
+SimpleMessageHandler::SimpleMessageHandler(const PersistenceUtil& env, spi::PersistenceProvider& spi)
+ : _env(env),
+ _spi(spi)
+{
+}
+
+MessageTracker::UP
+SimpleMessageHandler::handleGet(api::GetCommand& cmd, MessageTracker::UP tracker) const
+{
+ auto& metrics = _env._metrics.get[cmd.getLoadType()];
+ tracker->setMetric(metrics);
+ metrics.request_size.addValue(cmd.getApproxByteSize());
+
+ auto fieldSet = getFieldSet(*_env._component.getTypeRepo()->fieldSetRepo, cmd.getFieldSet(), *tracker);
+ if ( ! fieldSet) { return tracker; }
+
+ tracker->context().setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency()));
+ spi::GetResult result = _spi.get(_env.getBucket(cmd.getDocumentId(), cmd.getBucket()),
+ *fieldSet, cmd.getDocumentId(), tracker->context());
+
+ if (tracker->checkForError(result)) {
+ if (!result.hasDocument() && (document::FieldSet::Type::NONE != fieldSet->getType())) {
+ metrics.notFound.inc();
+ }
+ tracker->setReply(std::make_shared<api::GetReply>(cmd, result.getDocumentPtr(), result.getTimestamp(),
+ false, result.is_tombstone()));
+ }
+
+ return tracker;
+}
+
+MessageTracker::UP
+SimpleMessageHandler::handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.revert[cmd.getLoadType()]);
+ spi::Bucket b = spi::Bucket(cmd.getBucket());
+ const std::vector<api::Timestamp> & tokens = cmd.getRevertTokens();
+ for (const api::Timestamp & token : tokens) {
+ spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), tracker->context());
+ }
+ return tracker;
+}
+
+MessageTracker::UP
+SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.createBuckets);
+ LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
+ if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
+ LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str());
+ DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId());
+ }
+ spi::Bucket spiBucket(cmd.getBucket());
+ _spi.createBucket(spiBucket, tracker->context());
+ if (cmd.getActive()) {
+ _spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE);
+ }
+ return tracker;
+}
+
+bool
+SimpleMessageHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const
+{
+ spi::BucketInfoResult result(_spi.getBucketInfo(bucket));
+ if (result.hasError()) {
+ LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'",
+ bucket.toString().c_str(), result.getErrorMessage().c_str());
+ return false;
+ }
+ api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo()));
+ // Don't check meta fields or active/ready fields since these are not
+ // that important and ready may change under the hood in a race with
+ // getModifiedBuckets(). If bucket is empty it means it has already
+ // been deleted by a racing split/join.
+ if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) {
+ LOG(error,
+ "Service layer bucket database and provider out of sync before "
+ "deleting bucket %s! Service layer db had %s while provider says "
+ "bucket has %s. Deletion has been rejected to ensure data is not "
+ "lost, but bucket may remain out of sync until service has been "
+ "restarted.",
+ bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str());
+ return false;
+ }
+ return true;
+}
+
+MessageTracker::UP
+SimpleMessageHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.deleteBuckets);
+ LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str());
+ LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()");
+ if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
+ _env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
+ api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge"));
+ }
+ spi::Bucket bucket(cmd.getBucket());
+ if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) {
+ return tracker;
+ }
+ _spi.deleteBucket(bucket, tracker->context());
+ StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace()));
+ {
+ StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket"));
+ if (entry.exist() && entry->getMetaCount() > 0) {
+ LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
+ "active operation when delete bucket was queued. "
+ "Updating bucket database to keep it in sync with file. "
+ "Cannot delete bucket from bucket database at this "
+ "point, as it can have been intentionally recreated "
+ "after delete bucket had been sent",
+ cmd.getBucketId().toString().c_str());
+ api::BucketInfo info(0, 0, 0);
+ // Only set document counts/size; retain ready/active state.
+ info.setReady(entry->getBucketInfo().isReady());
+ info.setActive(entry->getBucketInfo().isActive());
+
+ entry->setBucketInfo(info);
+ entry.write();
+ }
+ }
+ return tracker;
+}
+
+MessageTracker::UP
+SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.visit[cmd.getLoadType()]);
+ spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), tracker->context()));
+ if (tracker->checkForError(result)) {
+ auto reply = std::make_shared<GetIterReply>(cmd);
+ reply->getEntries() = result.steal_entries();
+ _env._metrics.visit[cmd.getLoadType()].
+ documentsPerIterate.addValue(reply->getEntries().size());
+ if (result.isCompleted()) {
+ reply->setCompleted();
+ }
+ tracker->setReply(reply);
+ }
+ return tracker;
+}
+
+MessageTracker::UP
+SimpleMessageHandler::handleReadBucketList(ReadBucketList& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.readBucketList);
+
+ assert(cmd.getPartition() == 0u);
+ spi::BucketIdListResult result(_spi.listBuckets(cmd.getBucketSpace()));
+ if (tracker->checkForError(result)) {
+ auto reply = std::make_shared<ReadBucketListReply>(cmd);
+ result.getList().swap(reply->getBuckets());
+ tracker->setReply(reply);
+ }
+
+ return tracker;
+}
+
+MessageTracker::UP
+SimpleMessageHandler::handleReadBucketInfo(ReadBucketInfo& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.readBucketInfo);
+ _env.updateBucketDatabase(cmd.getBucket(), _env.getBucketInfo(cmd.getBucket()));
+ return tracker;
+}
+
+MessageTracker::UP
+SimpleMessageHandler::handleCreateIterator(CreateIteratorCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.createIterator);
+ auto fieldSet = getFieldSet(*_env._component.getTypeRepo()->fieldSetRepo, cmd.getFields(), *tracker);
+ if ( ! fieldSet) { return tracker; }
+
+ tracker->context().setReadConsistency(cmd.getReadConsistency());
+ spi::CreateIteratorResult result(_spi.createIterator(
+ spi::Bucket(cmd.getBucket()),
+ std::move(fieldSet), cmd.getSelection(), cmd.getIncludedVersions(), tracker->context()));
+ if (tracker->checkForError(result)) {
+ tracker->setReply(std::make_shared<CreateIteratorReply>(cmd, spi::IteratorId(result.getIteratorId())));
+ }
+ return tracker;
+}
+
+} // storage
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h
new file mode 100644
index 00000000000..887589fbf05
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h
@@ -0,0 +1,33 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "types.h"
+#include "messages.h"
+#include <vespa/storage/common/bucketmessages.h>
+#include <vespa/storageapi/message/persistence.h>
+
+namespace storage {
+
+namespace spi { struct PersistenceProvider; }
+struct PersistenceUtil;
+
+class SimpleMessageHandler : public Types {
+public:
+ SimpleMessageHandler(const PersistenceUtil&, spi::PersistenceProvider&);
+ MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTrackerUP tracker) const;
+private:
+ bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
+ const PersistenceUtil& _env;
+ spi::PersistenceProvider& _spi;
+};
+
+} // storage
+