From e6a8e2c087d25335784c05a8d01e1c1b665d3b3d Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Sun, 18 Oct 2020 15:50:16 +0000 Subject: Factor out handling of the remaining messages. --- storage/src/tests/persistence/testandsettest.cpp | 5 +- .../src/vespa/storage/persistence/CMakeLists.txt | 1 + storage/src/vespa/storage/persistence/messages.h | 1 - .../storage/persistence/persistencethread.cpp | 235 +------------------- .../vespa/storage/persistence/persistencethread.h | 14 +- .../vespa/storage/persistence/persistenceutil.cpp | 2 +- .../vespa/storage/persistence/persistenceutil.h | 4 +- .../storage/persistence/simplemessagehandler.cpp | 241 +++++++++++++++++++++ .../storage/persistence/simplemessagehandler.h | 33 +++ 9 files changed, 293 insertions(+), 243 deletions(-) create mode 100644 storage/src/vespa/storage/persistence/simplemessagehandler.cpp create mode 100644 storage/src/vespa/storage/persistence/simplemessagehandler.h diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp index e4c5227f951..3d0dd183232 100644 --- a/storage/src/tests/persistence/testandsettest.cpp +++ b/storage/src/tests/persistence/testandsettest.cpp @@ -251,10 +251,11 @@ TestAndSetTest::createTestDocument() return doc; } -document::Document::SP TestAndSetTest::retrieveTestDocument() +document::Document::SP +TestAndSetTest::retrieveTestDocument() { auto get = std::make_shared(BUCKET, testDocId, document::AllFields::NAME); - auto tracker = thread->handleGet(*get, createTracker(get, BUCKET)); + auto tracker = thread->simpleMessageHandler().handleGet(*get, createTracker(get, BUCKET)); assert(tracker->getResult() == api::ReturnCode::Result::OK); auto & reply = static_cast(tracker->getReply()); diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt index 5707c9ea687..aa22a67f747 100644 --- a/storage/src/vespa/storage/persistence/CMakeLists.txt +++ b/storage/src/vespa/storage/persistence/CMakeLists.txt @@ -11,6 +11,7 @@ vespa_add_library(storage_spersistence OBJECT persistenceutil.cpp processallhandler.cpp provider_error_wrapper.cpp + simplemessagehandler.cpp splitbitdetector.cpp splitjoinhandler.cpp testandsethelper.cpp diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h index b771af18b17..d4888970732 100644 --- a/storage/src/vespa/storage/persistence/messages.h +++ b/storage/src/vespa/storage/persistence/messages.h @@ -6,7 +6,6 @@ #include #include #include -#include namespace storage { diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 74d5c304bfb..719c2130e1a 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -46,6 +46,7 @@ PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor & sequence cfg.commonMergeChainOptimalizationMinimumSize), _asyncHandler(_env, _spi, sequencedExecutor), _splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization), + _simpleHandler(_env, provider), _thread() { _thread = _component->startThread(*this, 60s, 1s); @@ -60,230 +61,12 @@ PersistenceThread::~PersistenceThread() LOG(debug, "Persistence thread done with destruction"); } -namespace { - -spi::ReadConsistency api_read_consistency_to_spi(api::InternalReadConsistency consistency) noexcept { - switch (consistency) { - case api::InternalReadConsistency::Strong: return spi::ReadConsistency::STRONG; - case api::InternalReadConsistency::Weak: return spi::ReadConsistency::WEAK; - default: abort(); - } -} - -document::FieldSet::SP -getFieldSet(const document::FieldSetRepo & repo, vespalib::stringref name, MessageTracker & tracker) { - try { - return repo.getFieldSet(name); - } catch (document::FieldNotFoundException & e) { - tracker.fail(storage::api::ReturnCode::ILLEGAL_PARAMETERS, - fmt("Field %s in fieldset %s not found in document", e.getFieldName().c_str(), to_str(name).c_str())); - } catch (const vespalib::Exception & e) { - tracker.fail(storage::api::ReturnCode::ILLEGAL_PARAMETERS, - fmt("Failed parsing fieldset %s with : %s", to_str(name).c_str(), e.getMessage().c_str())); - } - return document::FieldSet::SP(); -} - -} - -MessageTracker::UP -PersistenceThread::handleGet(api::GetCommand& cmd, MessageTracker::UP tracker) -{ - auto& metrics = _env._metrics.get[cmd.getLoadType()]; - tracker->setMetric(metrics); - metrics.request_size.addValue(cmd.getApproxByteSize()); - - auto fieldSet = getFieldSet(*_env._component.getTypeRepo()->fieldSetRepo, cmd.getFieldSet(), *tracker); - if ( ! fieldSet) { return tracker; } - - tracker->context().setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency())); - spi::GetResult result = _spi.get(_env.getBucket(cmd.getDocumentId(), cmd.getBucket()), - *fieldSet, cmd.getDocumentId(), tracker->context()); - - if (tracker->checkForError(result)) { - if (!result.hasDocument() && (document::FieldSet::Type::NONE != fieldSet->getType())) { - metrics.notFound.inc(); - } - tracker->setReply(std::make_shared(cmd, result.getDocumentPtr(), result.getTimestamp(), - false, result.is_tombstone())); - } - - return tracker; -} - -MessageTracker::UP -PersistenceThread::handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker) -{ - tracker->setMetric(_env._metrics.revert[cmd.getLoadType()]); - spi::Bucket b = spi::Bucket(cmd.getBucket()); - const std::vector & tokens = cmd.getRevertTokens(); - for (const api::Timestamp & token : tokens) { - spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), tracker->context()); - } - return tracker; -} - -MessageTracker::UP -PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) -{ - tracker->setMetric(_env._metrics.createBuckets); - LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); - if (_env._fileStorHandler.isMerging(cmd.getBucket())) { - LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); - DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId()); - } - spi::Bucket spiBucket(cmd.getBucket()); - _spi.createBucket(spiBucket, tracker->context()); - if (cmd.getActive()) { - _spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE); - } - return tracker; -} - -namespace { - -bool bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) { - // Don't check document sizes, as background moving of documents in Proton - // may trigger a change in size without any mutations taking place. This will - // only take place when a document being moved was fed _prior_ to the change - // where Proton starts reporting actual document sizes, and will eventually - // converge to a stable value. But for now, ignore it to prevent false positive - // error logs and non-deleted buckets. - return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount())); -} - -} - -bool -PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const -{ - spi::BucketInfoResult result(_spi.getBucketInfo(bucket)); - if (result.hasError()) { - LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'", - bucket.toString().c_str(), result.getErrorMessage().c_str()); - return false; - } - api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo())); - // Don't check meta fields or active/ready fields since these are not - // that important and ready may change under the hood in a race with - // getModifiedBuckets(). If bucket is empty it means it has already - // been deleted by a racing split/join. - if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) { - LOG(error, - "Service layer bucket database and provider out of sync before " - "deleting bucket %s! Service layer db had %s while provider says " - "bucket has %s. Deletion has been rejected to ensure data is not " - "lost, but bucket may remain out of sync until service has been " - "restarted.", - bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str()); - return false; - } - return true; -} - -MessageTracker::UP -PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) -{ - tracker->setMetric(_env._metrics.deleteBuckets); - LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); - LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()"); - if (_env._fileStorHandler.isMerging(cmd.getBucket())) { - _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), - api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); - } - spi::Bucket bucket(cmd.getBucket()); - if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) { - return tracker; - } - _spi.deleteBucket(bucket, tracker->context()); - StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace())); - { - StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket")); - if (entry.exist() && entry->getMetaCount() > 0) { - LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely " - "active operation when delete bucket was queued. " - "Updating bucket database to keep it in sync with file. " - "Cannot delete bucket from bucket database at this " - "point, as it can have been intentionally recreated " - "after delete bucket had been sent", - cmd.getBucketId().toString().c_str()); - api::BucketInfo info(0, 0, 0); - // Only set document counts/size; retain ready/active state. - info.setReady(entry->getBucketInfo().isReady()); - info.setActive(entry->getBucketInfo().isActive()); - - entry->setBucketInfo(info); - entry.write(); - } - } - return tracker; -} - -MessageTracker::UP -PersistenceThread::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) -{ - tracker->setMetric(_env._metrics.visit[cmd.getLoadType()]); - spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), tracker->context())); - if (tracker->checkForError(result)) { - auto reply = std::make_shared(cmd); - reply->getEntries() = result.steal_entries(); - _env._metrics.visit[cmd.getLoadType()]. - documentsPerIterate.addValue(reply->getEntries().size()); - if (result.isCompleted()) { - reply->setCompleted(); - } - tracker->setReply(reply); - } - return tracker; -} - -MessageTracker::UP -PersistenceThread::handleReadBucketList(ReadBucketList& cmd, MessageTracker::UP tracker) -{ - tracker->setMetric(_env._metrics.readBucketList); - - assert(cmd.getPartition() == 0u); - spi::BucketIdListResult result(_spi.listBuckets(cmd.getBucketSpace())); - if (tracker->checkForError(result)) { - auto reply = std::make_shared(cmd); - result.getList().swap(reply->getBuckets()); - tracker->setReply(reply); - } - - return tracker; -} - -MessageTracker::UP -PersistenceThread::handleReadBucketInfo(ReadBucketInfo& cmd, MessageTracker::UP tracker) -{ - tracker->setMetric(_env._metrics.readBucketInfo); - _env.updateBucketDatabase(cmd.getBucket(), _env.getBucketInfo(cmd.getBucket())); - return tracker; -} - -MessageTracker::UP -PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, MessageTracker::UP tracker) -{ - tracker->setMetric(_env._metrics.createIterator); - auto fieldSet = getFieldSet(*_env._component.getTypeRepo()->fieldSetRepo, cmd.getFields(), *tracker); - if ( ! fieldSet) { return tracker; } - - tracker->context().setReadConsistency(cmd.getReadConsistency()); - spi::CreateIteratorResult result(_spi.createIterator( - spi::Bucket(cmd.getBucket()), - std::move(fieldSet), cmd.getSelection(), cmd.getIncludedVersions(), tracker->context())); - if (tracker->checkForError(result)) { - tracker->setReply(std::make_shared(cmd, spi::IteratorId(result.getIteratorId()))); - } - return tracker; -} - MessageTracker::UP PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTracker::UP tracker) { switch (msg.getType().getId()) { case api::MessageType::GET_ID: - return handleGet(static_cast(msg), std::move(tracker)); + return _simpleHandler.handleGet(static_cast(msg), std::move(tracker)); case api::MessageType::PUT_ID: return _asyncHandler.handlePut(static_cast(msg), std::move(tracker)); case api::MessageType::REMOVE_ID: @@ -291,11 +74,11 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTra case api::MessageType::UPDATE_ID: return _asyncHandler.handleUpdate(static_cast(msg), std::move(tracker)); case api::MessageType::REVERT_ID: - return handleRevert(static_cast(msg), std::move(tracker)); + return _simpleHandler.handleRevert(static_cast(msg), std::move(tracker)); case api::MessageType::CREATEBUCKET_ID: - return handleCreateBucket(static_cast(msg), std::move(tracker)); + return _simpleHandler.handleCreateBucket(static_cast(msg), std::move(tracker)); case api::MessageType::DELETEBUCKET_ID: - return handleDeleteBucket(static_cast(msg), std::move(tracker)); + return _simpleHandler.handleDeleteBucket(static_cast(msg), std::move(tracker)); case api::MessageType::JOINBUCKETS_ID: return _splitJoinHandler.handleJoinBuckets(static_cast(msg), std::move(tracker)); case api::MessageType::SPLITBUCKET_ID: @@ -316,13 +99,13 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTra case api::MessageType::INTERNAL_ID: switch(static_cast(msg).getType()) { case GetIterCommand::ID: - return handleGetIter(static_cast(msg), std::move(tracker)); + return _simpleHandler.handleGetIter(static_cast(msg), std::move(tracker)); case CreateIteratorCommand::ID: - return handleCreateIterator(static_cast(msg), std::move(tracker)); + return _simpleHandler.handleCreateIterator(static_cast(msg), std::move(tracker)); case ReadBucketList::ID: - return handleReadBucketList(static_cast(msg), std::move(tracker)); + return _simpleHandler.handleReadBucketList(static_cast(msg), std::move(tracker)); case ReadBucketInfo::ID: - return handleReadBucketInfo(static_cast(msg), std::move(tracker)); + return _simpleHandler.handleReadBucketInfo(static_cast(msg), std::move(tracker)); case InternalBucketJoinCommand::ID: return _splitJoinHandler.handleInternalBucketJoin(static_cast(msg), std::move(tracker)); case RecheckBucketInfoCommand::ID: diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index 48223afd405..917812868a7 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -9,6 +9,7 @@ #include "persistenceutil.h" #include "provider_error_wrapper.h" #include "splitjoinhandler.h" +#include "simplemessagehandler.h" #include #include #include @@ -31,18 +32,10 @@ public: void flush() override; framework::Thread& getThread() override { return *_thread; } - MessageTracker::UP handleGet(api::GetCommand& cmd, MessageTracker::UP tracker); - MessageTracker::UP handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker); - MessageTracker::UP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker); - MessageTracker::UP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker); - MessageTracker::UP handleCreateIterator(CreateIteratorCommand& cmd, MessageTracker::UP tracker); - MessageTracker::UP handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker); - MessageTracker::UP handleReadBucketList(ReadBucketList& cmd, MessageTracker::UP tracker); - MessageTracker::UP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTracker::UP tracker); - //TODO Rewrite tests to avoid this api leak const AsyncHandler & asyncHandler() const { return _asyncHandler; } const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; } + const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; } private: uint32_t _stripeId; ServiceLayerComponent::UP _component; @@ -52,10 +45,9 @@ private: MergeHandler _mergeHandler; AsyncHandler _asyncHandler; SplitJoinHandler _splitJoinHandler; + SimpleMessageHandler _simpleHandler; framework::Thread::UP _thread; - bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; - // Message handling functions MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker); void handleReply(api::StorageReply&); diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index ba074003b80..35842297ef9 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -171,7 +171,7 @@ PersistenceUtil::PersistenceUtil( PersistenceUtil::~PersistenceUtil() = default; void -PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& i) +PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& i) const { // Update bucket database StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index e27419ccc1f..c90eee4b7ae 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -108,11 +108,11 @@ struct PersistenceUtil { ~PersistenceUtil(); - StorBucketDatabase& getBucketDatabase(document::BucketSpace bucketSpace) { + StorBucketDatabase& getBucketDatabase(document::BucketSpace bucketSpace) const { return _component.getBucketDatabase(bucketSpace); } - void updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& info); + void updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& info) const; uint16_t getPreferredAvailableDisk(const document::Bucket &bucket) const; diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp new file mode 100644 index 00000000000..209c3846b97 --- /dev/null +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp @@ -0,0 +1,241 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "simplemessagehandler.h" +#include "persistenceutil.h" +#include +#include +#include +#include + +#include +LOG_SETUP(".persistence.processall"); + +using vespalib::make_string_short::fmt; +using to_str = vespalib::string; + +namespace storage { + +namespace { + +spi::ReadConsistency +api_read_consistency_to_spi(api::InternalReadConsistency consistency) noexcept { + switch (consistency) { + case api::InternalReadConsistency::Strong: + return spi::ReadConsistency::STRONG; + case api::InternalReadConsistency::Weak: + return spi::ReadConsistency::WEAK; + default: + abort(); + } +} + +document::FieldSet::SP +getFieldSet(const document::FieldSetRepo & repo, vespalib::stringref name, MessageTracker & tracker) { + try { + return repo.getFieldSet(name); + } catch (document::FieldNotFoundException & e) { + tracker.fail(storage::api::ReturnCode::ILLEGAL_PARAMETERS, + fmt("Field %s in fieldset %s not found in document", e.getFieldName().c_str(), to_str(name).c_str())); + } catch (const vespalib::Exception & e) { + tracker.fail(storage::api::ReturnCode::ILLEGAL_PARAMETERS, + fmt("Failed parsing fieldset %s with : %s", to_str(name).c_str(), e.getMessage().c_str())); + } + return document::FieldSet::SP(); +} + +bool +bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) { + // Don't check document sizes, as background moving of documents in Proton + // may trigger a change in size without any mutations taking place. This will + // only take place when a document being moved was fed _prior_ to the change + // where Proton starts reporting actual document sizes, and will eventually + // converge to a stable value. But for now, ignore it to prevent false positive + // error logs and non-deleted buckets. + return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount())); +} +} +SimpleMessageHandler::SimpleMessageHandler(const PersistenceUtil& env, spi::PersistenceProvider& spi) + : _env(env), + _spi(spi) +{ +} + +MessageTracker::UP +SimpleMessageHandler::handleGet(api::GetCommand& cmd, MessageTracker::UP tracker) const +{ + auto& metrics = _env._metrics.get[cmd.getLoadType()]; + tracker->setMetric(metrics); + metrics.request_size.addValue(cmd.getApproxByteSize()); + + auto fieldSet = getFieldSet(*_env._component.getTypeRepo()->fieldSetRepo, cmd.getFieldSet(), *tracker); + if ( ! fieldSet) { return tracker; } + + tracker->context().setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency())); + spi::GetResult result = _spi.get(_env.getBucket(cmd.getDocumentId(), cmd.getBucket()), + *fieldSet, cmd.getDocumentId(), tracker->context()); + + if (tracker->checkForError(result)) { + if (!result.hasDocument() && (document::FieldSet::Type::NONE != fieldSet->getType())) { + metrics.notFound.inc(); + } + tracker->setReply(std::make_shared(cmd, result.getDocumentPtr(), result.getTimestamp(), + false, result.is_tombstone())); + } + + return tracker; +} + +MessageTracker::UP +SimpleMessageHandler::handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.revert[cmd.getLoadType()]); + spi::Bucket b = spi::Bucket(cmd.getBucket()); + const std::vector & tokens = cmd.getRevertTokens(); + for (const api::Timestamp & token : tokens) { + spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), tracker->context()); + } + return tracker; +} + +MessageTracker::UP +SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.createBuckets); + LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); + if (_env._fileStorHandler.isMerging(cmd.getBucket())) { + LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); + DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId()); + } + spi::Bucket spiBucket(cmd.getBucket()); + _spi.createBucket(spiBucket, tracker->context()); + if (cmd.getActive()) { + _spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE); + } + return tracker; +} + +bool +SimpleMessageHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const +{ + spi::BucketInfoResult result(_spi.getBucketInfo(bucket)); + if (result.hasError()) { + LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'", + bucket.toString().c_str(), result.getErrorMessage().c_str()); + return false; + } + api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo())); + // Don't check meta fields or active/ready fields since these are not + // that important and ready may change under the hood in a race with + // getModifiedBuckets(). If bucket is empty it means it has already + // been deleted by a racing split/join. + if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) { + LOG(error, + "Service layer bucket database and provider out of sync before " + "deleting bucket %s! Service layer db had %s while provider says " + "bucket has %s. Deletion has been rejected to ensure data is not " + "lost, but bucket may remain out of sync until service has been " + "restarted.", + bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str()); + return false; + } + return true; +} + +MessageTracker::UP +SimpleMessageHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.deleteBuckets); + LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); + LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()"); + if (_env._fileStorHandler.isMerging(cmd.getBucket())) { + _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), + api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); + } + spi::Bucket bucket(cmd.getBucket()); + if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) { + return tracker; + } + _spi.deleteBucket(bucket, tracker->context()); + StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace())); + { + StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket")); + if (entry.exist() && entry->getMetaCount() > 0) { + LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely " + "active operation when delete bucket was queued. " + "Updating bucket database to keep it in sync with file. " + "Cannot delete bucket from bucket database at this " + "point, as it can have been intentionally recreated " + "after delete bucket had been sent", + cmd.getBucketId().toString().c_str()); + api::BucketInfo info(0, 0, 0); + // Only set document counts/size; retain ready/active state. + info.setReady(entry->getBucketInfo().isReady()); + info.setActive(entry->getBucketInfo().isActive()); + + entry->setBucketInfo(info); + entry.write(); + } + } + return tracker; +} + +MessageTracker::UP +SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.visit[cmd.getLoadType()]); + spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), tracker->context())); + if (tracker->checkForError(result)) { + auto reply = std::make_shared(cmd); + reply->getEntries() = result.steal_entries(); + _env._metrics.visit[cmd.getLoadType()]. + documentsPerIterate.addValue(reply->getEntries().size()); + if (result.isCompleted()) { + reply->setCompleted(); + } + tracker->setReply(reply); + } + return tracker; +} + +MessageTracker::UP +SimpleMessageHandler::handleReadBucketList(ReadBucketList& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.readBucketList); + + assert(cmd.getPartition() == 0u); + spi::BucketIdListResult result(_spi.listBuckets(cmd.getBucketSpace())); + if (tracker->checkForError(result)) { + auto reply = std::make_shared(cmd); + result.getList().swap(reply->getBuckets()); + tracker->setReply(reply); + } + + return tracker; +} + +MessageTracker::UP +SimpleMessageHandler::handleReadBucketInfo(ReadBucketInfo& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.readBucketInfo); + _env.updateBucketDatabase(cmd.getBucket(), _env.getBucketInfo(cmd.getBucket())); + return tracker; +} + +MessageTracker::UP +SimpleMessageHandler::handleCreateIterator(CreateIteratorCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.createIterator); + auto fieldSet = getFieldSet(*_env._component.getTypeRepo()->fieldSetRepo, cmd.getFields(), *tracker); + if ( ! fieldSet) { return tracker; } + + tracker->context().setReadConsistency(cmd.getReadConsistency()); + spi::CreateIteratorResult result(_spi.createIterator( + spi::Bucket(cmd.getBucket()), + std::move(fieldSet), cmd.getSelection(), cmd.getIncludedVersions(), tracker->context())); + if (tracker->checkForError(result)) { + tracker->setReply(std::make_shared(cmd, spi::IteratorId(result.getIteratorId()))); + } + return tracker; +} + +} // storage diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h new file mode 100644 index 00000000000..887589fbf05 --- /dev/null +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h @@ -0,0 +1,33 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "types.h" +#include "messages.h" +#include +#include + +namespace storage { + +namespace spi { struct PersistenceProvider; } +struct PersistenceUtil; + +class SimpleMessageHandler : public Types { +public: + SimpleMessageHandler(const PersistenceUtil&, spi::PersistenceProvider&); + MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTrackerUP tracker) const; +private: + bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; + const PersistenceUtil& _env; + spi::PersistenceProvider& _spi; +}; + +} // storage + -- cgit v1.2.3