From e59348551c200c15f5dc4158274d96b57b91227f Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Thu, 21 Oct 2021 16:54:10 +0000 Subject: Async createBucket --- .../persistence/dummyimpl/dummypersistence.cpp | 6 +- .../vespa/persistence/dummyimpl/dummypersistence.h | 2 +- .../persistence/spi/abstractpersistenceprovider.h | 1 - .../vespa/persistence/spi/persistenceprovider.cpp | 8 + .../vespa/persistence/spi/persistenceprovider.h | 3 +- .../proton/persistenceengine/persistenceengine.cpp | 18 +- .../proton/persistenceengine/persistenceengine.h | 2 +- .../common/persistenceproviderwrapper.cpp | 8 +- .../common/persistenceproviderwrapper.h | 2 +- .../filestorage/operationabortingtest.cpp | 4 +- .../src/vespa/storage/persistence/asynchandler.cpp | 28 +++ .../src/vespa/storage/persistence/asynchandler.h | 1 + .../src/vespa/storage/persistence/mergehandler.cpp | 265 +++++++++++---------- .../src/vespa/storage/persistence/mergehandler.h | 1 + .../storage/persistence/persistencehandler.cpp | 2 +- .../storage/persistence/provider_error_wrapper.cpp | 11 +- .../storage/persistence/provider_error_wrapper.h | 2 +- .../storage/persistence/simplemessagehandler.cpp | 16 -- .../storage/persistence/simplemessagehandler.h | 1 - 19 files changed, 206 insertions(+), 175 deletions(-) diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 6e4f38fe564..b8a390ed0ce 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -714,8 +714,8 @@ DummyPersistence::destroyIterator(IteratorId id, Context&) return Result(); } -Result -DummyPersistence::createBucket(const Bucket& b, Context&) +void +DummyPersistence::createBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "createBucket(%s)", b.toString().c_str()); @@ -727,7 +727,7 @@ DummyPersistence::createBucket(const Bucket& b, Context&) assert(!_content[b]->_inUse); LOG(debug, "%s already existed", b.toString().c_str()); } - return Result(); + onComplete->onComplete(std::make_unique()); } void diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index 99d6ba717b7..2ab97b0b403 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -168,7 +168,7 @@ public: IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override; Result destroyIterator(IteratorId, Context&) override; - Result createBucket(const Bucket&, Context&) override; + void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h index e287bdc5252..3b59f20ca96 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h @@ -15,7 +15,6 @@ class AbstractPersistenceProvider : public PersistenceProvider { public: Result initialize() override { return Result(); }; - Result createBucket(const Bucket&, Context&) override { return Result(); } Result removeEntry(const Bucket&, Timestamp, Context&) override { return Result(); } void removeIfFoundAsync(const Bucket&, Timestamp, const DocumentId&, Context&, OperationComplete::UP) override; Result setClusterState(BucketSpace, const ClusterState&) override { return Result(); } diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp index 3ea476c33fc..31db08a6f4f 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp @@ -16,6 +16,14 @@ PersistenceProvider::setActiveState(const Bucket& bucket, BucketInfo::ActiveStat return *future.get(); } +Result +PersistenceProvider::createBucket(const Bucket& bucket, Context& context) { + auto catcher = std::make_unique(); + auto future = catcher->future_result(); + createBucketAsync(bucket, context, std::move(catcher)); + return *future.get(); +} + Result PersistenceProvider::deleteBucket(const Bucket& bucket, Context& context) { auto catcher = std::make_unique(); diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index 83eb042d855..09a752d4ded 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -58,6 +58,7 @@ struct PersistenceProvider virtual ~PersistenceProvider(); // TODO Move to utility class for use in tests only + Result createBucket(const Bucket&, Context&); Result deleteBucket(const Bucket&, Context&); Result put(const Bucket&, Timestamp, DocumentSP, Context&); Result setActiveState(const Bucket&, BucketInfo::ActiveState); @@ -336,7 +337,7 @@ struct PersistenceProvider * Tells the provider that the given bucket has been created in the * service layer. There is no requirement to do anything here. */ - virtual Result createBucket(const Bucket&, Context&) = 0; + virtual void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) = 0; /** * Deletes the given bucket and all entries contained in that bucket. diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 114292d055d..952c2218140 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -548,19 +548,23 @@ PersistenceEngine::destroyIterator(IteratorId id, Context&) } -Result -PersistenceEngine::createBucket(const Bucket &b, Context &) +void +PersistenceEngine::createBucketAsync(const Bucket &b, Context &, OperationComplete::UP onComplete) { ReadGuard rguard(_rwMutex); LOG(spam, "createBucket(%s)", b.toString().c_str()); HandlerSnapshot snap = getHandlerSnapshot(rguard, b.getBucketSpace()); - TransportLatch latch(snap.size()); - for (; snap.handlers().valid(); snap.handlers().next()) { + + auto transportContext = std::make_shared(snap.size(), std::move(onComplete)); + while (snap.handlers().valid()) { IPersistenceHandler *handler = snap.handlers().get(); - handler->handleCreateBucket(feedtoken::make(latch), b); + snap.handlers().next(); + if (snap.handlers().valid()) { + handler->handleCreateBucket(feedtoken::make(transportContext), b); + } else { + handler->handleCreateBucket(feedtoken::make(std::move(transportContext)), b); + } } - latch.await(); - return latch.getResult(); } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index 94331ac2cd6..9cabebc8135 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -114,7 +114,7 @@ public: IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override; Result destroyIterator(IteratorId, Context&) override; - Result createBucket(const Bucket &bucketId, Context &) override ; + void createBucketAsync(const Bucket &bucketId, Context &, OperationComplete::UP) override; void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index b3bd1c6a253..302e603a9d8 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -80,12 +80,12 @@ PersistenceProviderWrapper::listBuckets(BucketSpace bucketSpace) const return _spi.listBuckets(bucketSpace); } -spi::Result -PersistenceProviderWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context) +void +PersistenceProviderWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) { LOG_SPI("createBucket(" << bucket << ")"); - CHECK_ERROR(spi::Result, FAIL_CREATE_BUCKET); - return _spi.createBucket(bucket, context); + CHECK_ERROR_ASYNC(spi::Result, FAIL_CREATE_BUCKET, onComplete); + return _spi.createBucketAsync(bucket, context, std::move(onComplete)); } spi::BucketInfoResult diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index c6628814dba..3cb7b92356b 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -96,7 +96,7 @@ public: void setActiveStateAsync(const spi::Bucket &bucket, spi::BucketInfo::ActiveState state, spi::OperationComplete::UP up) override; - spi::Result createBucket(const spi::Bucket&, spi::Context&) override; + void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override; spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&, spi::OperationComplete::UP) override; diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index 07d2b24d536..75d9b595c4f 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -62,9 +62,9 @@ public: return PersistenceProviderWrapper::getBucketInfo(bucket); } - spi::Result createBucket(const spi::Bucket& bucket, spi::Context& ctx) override { + void createBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override { ++_createBucketInvocations; - return PersistenceProviderWrapper::createBucket(bucket, ctx); + PersistenceProviderWrapper::createBucketAsync(bucket, ctx, std::move(onComplete)); } void diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 47b5e4f5f27..b3e9d596c76 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -153,6 +153,34 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons return trackerUP; } +MessageTracker::UP +AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.createBuckets); + LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); + if (_env._fileStorHandler.isMerging(cmd.getBucket())) { + LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); + } + spi::Bucket bucket(cmd.getBucket()); + auto task = makeResultTask([this, tracker = std::move(tracker), bucket, active=cmd.getActive()](spi::Result::UP ignored) mutable { + // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric + (void) ignored; + if (active) { + auto nestedTask = makeResultTask([tracker = std::move(tracker)](spi::Result::UP nestedIgnored) { + // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric + (void) nestedIgnored; + tracker->sendReply(); + }); + _spi.setActiveStateAsync(bucket, spi::BucketInfo::ACTIVE, std::make_unique(_sequencedExecutor, bucket, std::move(nestedTask))); + } else { + tracker->sendReply(); + } + }); + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique(_sequencedExecutor, bucket, std::move(task))); + + return tracker; +} + MessageTracker::UP AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const { diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index 4f5c242570c..db5a77bfb59 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -30,6 +30,7 @@ public: MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; static bool is_async_message(api::MessageType::Id type_id) noexcept; private: bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 77e7762ec9a..2f19b8695a4 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -12,7 +12,6 @@ #include #include #include -#include #include LOG_SETUP(".persistence.mergehandler"); @@ -663,24 +662,25 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const } namespace { - void findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask, - uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd) - { - for (const auto& entry : status.diff) { - uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask); - if ((entry_has_mask == 0u) || - (constrictHasMask && (entry_has_mask != hasMask))) { - continue; - } - cmd.getDiff().emplace_back(entry); - if (constrictHasMask) { - cmd.getDiff().back()._entry._hasMask = newHasMask; - } else { - cmd.getDiff().back()._entry._hasMask = entry_has_mask; - } +void +findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask, + uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd) +{ + for (const auto& entry : status.diff) { + uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask); + if ((entry_has_mask == 0u) || + (constrictHasMask && (entry_has_mask != hasMask))) { + continue; + } + cmd.getDiff().emplace_back(entry); + if (constrictHasMask) { + cmd.getDiff().back()._entry._hasMask = newHasMask; + } else { + cmd.getDiff().back()._entry._hasMask = entry_has_mask; } } } +} api::StorageReply::SP MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, @@ -938,141 +938,146 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP namespace { - uint8_t findOwnIndex( - const std::vector& nodeList, - uint16_t us) - { - for (uint32_t i=0, n=nodeList.size(); i& nodeList, + uint16_t us) +{ + for (uint32_t i=0, n=nodeList.size(); i - { - bool operator()(const api::GetBucketDiffCommand::Entry& x, - const api::GetBucketDiffCommand::Entry& y) const { - return (x._timestamp < y._timestamp); - } - }; - - /** - * Merges list A and list B together and puts the result in result. - * Result is swapped in as last step to keep function exception safe. Thus - * result can be listA or listB if wanted. - * - * listA and listB are assumed to be in the order found in the slotfile, or - * in the order given by a previous call to this function. (In both cases - * this will be sorted by timestamp) - * - * @return false if any suspect entries was found. - */ - bool mergeLists( - const std::vector& listA, - const std::vector& listB, - std::vector& finalResult) - { - bool suspect = false; - std::vector result; - uint32_t i = 0, j = 0; - while (i < listA.size() && j < listB.size()) { - const api::GetBucketDiffCommand::Entry& a(listA[i]); - const api::GetBucketDiffCommand::Entry& b(listB[j]); - if (a._timestamp < b._timestamp) { - result.push_back(a); - ++i; - } else if (a._timestamp > b._timestamp) { - result.push_back(b); - ++j; - } else { - // If we find equal timestamped entries that are not the - // same.. Flag an error. But there is nothing we can do - // about it. Note it as if it is the same entry so we - // dont try to merge them. - if (!(a == b)) { - if (a._gid == b._gid && a._flags == b._flags) { - if ((a._flags & getDeleteFlag()) != 0 && - (b._flags & getDeleteFlag()) != 0) - { - // Unfortunately this can happen, for instance - // if a remove comes to a bucket out of sync - // and reuses different headers in the two - // versions. - LOG(debug, "Found entries with equal timestamps of " - "the same gid who both are remove " - "entries: %s <-> %s.", - a.toString(true).c_str(), - b.toString(true).c_str()); - } else { - LOG(error, "Found entries with equal timestamps of " - "the same gid. This is likely same " - "document where size of document varies:" - " %s <-> %s.", - a.toString(true).c_str(), - b.toString(true).c_str()); - } - result.push_back(a); - result.back()._hasMask |= b._hasMask; - suspect = true; - } else if ((a._flags & getDeleteFlag()) - != (b._flags & getDeleteFlag())) +/** + * Merges list A and list B together and puts the result in result. + * Result is swapped in as last step to keep function exception safe. Thus + * result can be listA or listB if wanted. + * + * listA and listB are assumed to be in the order found in the slotfile, or + * in the order given by a previous call to this function. (In both cases + * this will be sorted by timestamp) + * + * @return false if any suspect entries was found. + */ +bool mergeLists( + const std::vector& listA, + const std::vector& listB, + std::vector& finalResult) +{ + bool suspect = false; + std::vector result; + uint32_t i = 0, j = 0; + while (i < listA.size() && j < listB.size()) { + const api::GetBucketDiffCommand::Entry& a(listA[i]); + const api::GetBucketDiffCommand::Entry& b(listB[j]); + if (a._timestamp < b._timestamp) { + result.push_back(a); + ++i; + } else if (a._timestamp > b._timestamp) { + result.push_back(b); + ++j; + } else { + // If we find equal timestamped entries that are not the + // same.. Flag an error. But there is nothing we can do + // about it. Note it as if it is the same entry so we + // dont try to merge them. + if (!(a == b)) { + if (a._gid == b._gid && a._flags == b._flags) { + if ((a._flags & getDeleteFlag()) != 0 && + (b._flags & getDeleteFlag()) != 0) { - // If we find one remove and one put entry on the - // same timestamp we are going to keep the remove - // entry to make the copies consistent. - const api::GetBucketDiffCommand::Entry& deletedEntry( - (a._flags & getDeleteFlag()) != 0 ? a : b); - result.push_back(deletedEntry); - LOG(debug, - "Found put and remove on same timestamp. Keeping" - "remove as it is likely caused by remove with " - "copies unavailable at the time: %s, %s.", - a.toString().c_str(), b.toString().c_str()); + // Unfortunately this can happen, for instance + // if a remove comes to a bucket out of sync + // and reuses different headers in the two + // versions. + LOG(debug, "Found entries with equal timestamps of " + "the same gid who both are remove " + "entries: %s <-> %s.", + a.toString(true).c_str(), + b.toString(true).c_str()); } else { - LOG(error, "Found entries with equal timestamps that " - "weren't the same entry: %s, %s.", - a.toString().c_str(), b.toString().c_str()); - result.push_back(a); - result.back()._hasMask |= b._hasMask; - suspect = true; + LOG(error, "Found entries with equal timestamps of " + "the same gid. This is likely same " + "document where size of document varies:" + " %s <-> %s.", + a.toString(true).c_str(), + b.toString(true).c_str()); } + result.push_back(a); + result.back()._hasMask |= b._hasMask; + suspect = true; + } else if ((a._flags & getDeleteFlag()) + != (b._flags & getDeleteFlag())) + { + // If we find one remove and one put entry on the + // same timestamp we are going to keep the remove + // entry to make the copies consistent. + const api::GetBucketDiffCommand::Entry& deletedEntry( + (a._flags & getDeleteFlag()) != 0 ? a : b); + result.push_back(deletedEntry); + LOG(debug, + "Found put and remove on same timestamp. Keeping" + "remove as it is likely caused by remove with " + "copies unavailable at the time: %s, %s.", + a.toString().c_str(), b.toString().c_str()); } else { + LOG(error, "Found entries with equal timestamps that " + "weren't the same entry: %s, %s.", + a.toString().c_str(), b.toString().c_str()); result.push_back(a); result.back()._hasMask |= b._hasMask; + suspect = true; } - ++i; - ++j; + } else { + result.push_back(a); + result.back()._hasMask |= b._hasMask; } + ++i; + ++j; } - if (i < listA.size()) { - assert(j >= listB.size()); - for (uint32_t n = listA.size(); i= listA.size()); - for (uint32_t n = listB.size(); j= listB.size()); + for (uint32_t n = listA.size(); i= listA.size()); + for (uint32_t n = listB.size(); j result) override { + checkResult(*result, _bucket, _msg); + } + void addResultHandler(const spi::ResultHandler *) override { } +}; } MessageTracker::UP -MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const -{ +MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.getBucketDiff); spi::Bucket bucket(cmd.getBucket()); LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str()); - checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket"); + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique(bucket, "create bucket")); + return handleGetBucketDiffStage2(cmd, std::move(tracker)); +} +MessageTracker::UP +MergeHandler::handleGetBucketDiffStage2(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const +{ + spi::Bucket bucket(cmd.getBucket()); if (_env._fileStorHandler.isMerging(bucket.getBucket())) { tracker->fail(api::ReturnCode::BUSY, "A merge is already running on this bucket."); return tracker; diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 17cfb847d2c..e6b4d047209 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -86,6 +86,7 @@ private: const uint32_t _commonMergeChainOptimalizationMinimumSize; std::atomic _async_apply_bucket_diff; + MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const; /** Returns a reply if merge is complete */ api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index aa1a9c136fd..761021a9612 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -44,7 +44,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr case api::MessageType::REVERT_ID: return _simpleHandler.handleRevert(static_cast(msg), std::move(tracker)); case api::MessageType::CREATEBUCKET_ID: - return _simpleHandler.handleCreateBucket(static_cast(msg), std::move(tracker)); + return _asyncHandler.handleCreateBucket(static_cast(msg), std::move(tracker)); case api::MessageType::DELETEBUCKET_ID: return _asyncHandler.handleDeleteBucket(static_cast(msg), std::move(tracker)); case api::MessageType::JOINBUCKETS_ID: diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index ce424f0ce83..5c27d112a7c 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -99,17 +99,18 @@ ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId, spi::Context& return checkResult(_impl.destroyIterator(iteratorId, context)); } -spi::Result -ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context) +void +ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) { - return checkResult(_impl.createBucket(bucket, context)); + onComplete->addResultHandler(this); + _impl.deleteBucketAsync(bucket, context, std::move(onComplete)); } void -ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) +ProviderErrorWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) { onComplete->addResultHandler(this); - _impl.deleteBucketAsync(bucket, context, std::move(onComplete)); + _impl.createBucketAsync(bucket, context, std::move(onComplete)); } spi::BucketIdListResult diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index c9d2411e372..e999f33f2bd 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -49,7 +49,6 @@ public: spi::Context &context) override; spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; - spi::Result createBucket(const spi::Bucket&, spi::Context&) override; spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override; @@ -63,6 +62,7 @@ public: void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override; void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override; + void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; std::unique_ptr register_executor(std::shared_ptr executor) override; private: diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp index b4fe207e2e5..9a7a451b906 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp @@ -87,22 +87,6 @@ SimpleMessageHandler::handleRevert(api::RevertCommand& cmd, MessageTracker::UP t return tracker; } -MessageTracker::UP -SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const -{ - tracker->setMetric(_env._metrics.createBuckets); - LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); - if (_env._fileStorHandler.isMerging(cmd.getBucket())) { - LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); - } - spi::Bucket spiBucket(cmd.getBucket()); - _spi.createBucket(spiBucket, tracker->context()); - if (cmd.getActive()) { - _spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE); - } - return tracker; -} - MessageTracker::UP SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const { diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h index 2cfbc7016c0..009fd6dff52 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.h +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h @@ -22,7 +22,6 @@ public: SimpleMessageHandler(const PersistenceUtil&, spi::PersistenceProvider&); MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const; - MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const; -- cgit v1.2.3 From 79cbce284155baf5b85fc1ae257fe8f2b24a6743 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Thu, 21 Oct 2021 20:08:00 +0000 Subject: No need to wait for the result of async createBucket as the result is ignored anyway, and no later operations depends on the outcome. In addition they are sequenced as the master executor conducts all operation in fifo order. --- .../src/vespa/storage/persistence/asynchandler.cpp | 26 ++++++++++++---------- .../src/vespa/storage/persistence/mergehandler.cpp | 25 ++++++++++++--------- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index b3e9d596c76..5da2b65b7ff 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -91,6 +91,11 @@ private: vespalib::ISequencedTaskExecutor::ExecutorId _executorId; }; +struct Noop : public spi::OperationComplete { + void onComplete(std::unique_ptr) override { } + void addResultHandler(const spi::ResultHandler *) override { } +}; + bool bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) { // Don't check document sizes, as background moving of documents in Proton @@ -162,21 +167,18 @@ AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker:: LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); } spi::Bucket bucket(cmd.getBucket()); - auto task = makeResultTask([this, tracker = std::move(tracker), bucket, active=cmd.getActive()](spi::Result::UP ignored) mutable { + auto task = makeResultTask([tracker = std::move(tracker)](spi::Result::UP ignored) mutable { // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric (void) ignored; - if (active) { - auto nestedTask = makeResultTask([tracker = std::move(tracker)](spi::Result::UP nestedIgnored) { - // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric - (void) nestedIgnored; - tracker->sendReply(); - }); - _spi.setActiveStateAsync(bucket, spi::BucketInfo::ACTIVE, std::make_unique(_sequencedExecutor, bucket, std::move(nestedTask))); - } else { - tracker->sendReply(); - } + tracker->sendReply(); }); - _spi.createBucketAsync(bucket, tracker->context(), std::make_unique(_sequencedExecutor, bucket, std::move(task))); + + if (cmd.getActive()) { + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique()); + _spi.setActiveStateAsync(bucket, spi::BucketInfo::ACTIVE, std::make_unique(_sequencedExecutor, bucket, std::move(task))); + } else { + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique(_sequencedExecutor, bucket, std::move(task))); + } return tracker; } diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 2f19b8695a4..7727a591af7 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -662,6 +662,7 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const } namespace { + void findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask, uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd) @@ -680,6 +681,17 @@ findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHa } } } + +struct CheckResult : public spi::OperationComplete { + spi::Bucket _bucket; + const char *_msg; + CheckResult(spi::Bucket bucket, const char * msg) : _bucket(bucket), _msg(msg) { } + void onComplete(std::unique_ptr result) override { + checkResult(*result, _bucket, _msg); + } + void addResultHandler(const spi::ResultHandler *) override { } +}; + } api::StorageReply::SP @@ -898,7 +910,8 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP tracker->fail(api::ReturnCode::BUSY, err); return tracker; } - checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket"); + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique(bucket, "create bucket")); + MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); auto s = std::make_shared(_clock, cmd.getPriority(), cmd.getTrace().getLevel()); @@ -1053,16 +1066,6 @@ bool mergeLists( return !suspect; } -struct CheckResult : public spi::OperationComplete { - spi::Bucket _bucket; - const char *_msg; - CheckResult(spi::Bucket bucket, const char * msg) : _bucket(bucket), _msg(msg) { } - void onComplete(std::unique_ptr result) override { - checkResult(*result, _bucket, _msg); - } - void addResultHandler(const spi::ResultHandler *) override { } -}; - } MessageTracker::UP -- cgit v1.2.3 From 7c8476d08bbf76b7c2ba05911b9cd42fe32dd448 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 25 Oct 2021 11:44:28 +0000 Subject: create/delete bucket will never throw. --- .../persistence/dummyimpl/dummypersistence.cpp | 478 ++++++++++----------- .../vespa/persistence/dummyimpl/dummypersistence.h | 4 +- .../src/vespa/persistence/spi/catchresult.h | 5 + .../vespa/persistence/spi/persistenceprovider.h | 4 +- .../proton/persistenceengine/persistenceengine.cpp | 4 +- .../proton/persistenceengine/persistenceengine.h | 4 +- .../common/persistenceproviderwrapper.cpp | 6 +- .../common/persistenceproviderwrapper.h | 4 +- .../filestorage/operationabortingtest.cpp | 4 +- storage/src/tests/persistence/mergehandlertest.cpp | 2 - .../src/vespa/storage/persistence/asynchandler.cpp | 8 +- .../src/vespa/storage/persistence/mergehandler.cpp | 15 +- .../storage/persistence/provider_error_wrapper.cpp | 4 +- .../storage/persistence/provider_error_wrapper.h | 4 +- 14 files changed, 264 insertions(+), 282 deletions(-) diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index b8a390ed0ce..944217796ab 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -27,109 +27,108 @@ using document::FixedBucketSpaces; namespace storage::spi::dummy { -BucketContent::BucketContent() - : _entries(), - _gidMap(), - _info(), - _inUse(false), - _outdatedInfo(true), - _active(false) -{ } -BucketContent::~BucketContent() = default; - -uint32_t -BucketContent::computeEntryChecksum(const BucketEntry& e) const -{ - vespalib::crc_32_type checksummer; - - uint64_t ts(e.entry->getTimestamp()); - checksummer.process_bytes(&e.gid, sizeof(GlobalId)); - checksummer.process_bytes(&ts, sizeof(uint64_t)); - return checksummer.checksum(); -} - -BucketChecksum -BucketContent::updateRollingChecksum(uint32_t entryChecksum) -{ - uint32_t checksum = _info.getChecksum(); - checksum ^= entryChecksum; - if (checksum == 0) { - checksum = 1; + BucketContent::BucketContent() + : _entries(), + _gidMap(), + _info(), + _inUse(false), + _outdatedInfo(true), + _active(false) {} + + BucketContent::~BucketContent() = default; + + uint32_t + BucketContent::computeEntryChecksum(const BucketEntry &e) const { + vespalib::crc_32_type checksummer; + + uint64_t ts(e.entry->getTimestamp()); + checksummer.process_bytes(&e.gid, sizeof(GlobalId)); + checksummer.process_bytes(&ts, sizeof(uint64_t)); + return checksummer.checksum(); + } + + BucketChecksum + BucketContent::updateRollingChecksum(uint32_t entryChecksum) { + uint32_t checksum = _info.getChecksum(); + checksum ^= entryChecksum; + if (checksum == 0) { + checksum = 1; + } + return BucketChecksum(checksum); } - return BucketChecksum(checksum); -} -const BucketInfo& -BucketContent::getBucketInfo() const -{ - if (!_outdatedInfo) { - return _info; - } + const BucketInfo & + BucketContent::getBucketInfo() const { + if (!_outdatedInfo) { + return _info; + } - // Checksum should only depend on the newest entry for each document that - // has not been removed. - uint32_t unique = 0; - uint32_t uniqueSize = 0; - uint32_t totalSize = 0; - uint32_t checksum = 0; + // Checksum should only depend on the newest entry for each document that + // has not been removed. + uint32_t unique = 0; + uint32_t uniqueSize = 0; + uint32_t totalSize = 0; + uint32_t checksum = 0; - for (const BucketEntry & bucketEntry : _entries) { - const DocEntry& entry(*bucketEntry.entry); - const GlobalId& gid(bucketEntry.gid); + for (const BucketEntry &bucketEntry: _entries) { + const DocEntry &entry(*bucketEntry.entry); + const GlobalId &gid(bucketEntry.gid); - GidMapType::const_iterator gidIt(_gidMap.find(gid)); - assert(gidIt != _gidMap.end()); + GidMapType::const_iterator gidIt(_gidMap.find(gid)); + assert(gidIt != _gidMap.end()); - totalSize += entry.getSize(); - if (entry.isRemove()) { - continue; + totalSize += entry.getSize(); + if (entry.isRemove()) { + continue; + } + // Only include if we're newest entry for the particular GID + if (gidIt->second.get() != &entry) { + continue; + } + ++unique; + uniqueSize += entry.getSize(); + + checksum ^= computeEntryChecksum(bucketEntry); } - // Only include if we're newest entry for the particular GID - if (gidIt->second.get() != &entry) { - continue; + if (!unique) { + checksum = 0; + } else if (checksum == 0) { + checksum = 1; } - ++unique; - uniqueSize += entry.getSize(); - checksum ^= computeEntryChecksum(bucketEntry); - } - if (!unique) { - checksum = 0; - } else if (checksum == 0) { - checksum = 1; - } + _info = BucketInfo(BucketChecksum(checksum), + unique, + uniqueSize, + _entries.size(), + totalSize, + BucketInfo::READY, + _active ? BucketInfo::ACTIVE : BucketInfo::NOT_ACTIVE); - _info = BucketInfo(BucketChecksum(checksum), - unique, - uniqueSize, - _entries.size(), - totalSize, - BucketInfo::READY, - _active ? BucketInfo::ACTIVE : BucketInfo::NOT_ACTIVE); + _outdatedInfo = false; + return _info; + } - _outdatedInfo = false; - return _info; -} + namespace { -namespace { + struct TimestampLess { + bool operator()(const BucketEntry &bucketEntry, Timestamp t) { + return bucketEntry.entry->getTimestamp() < t; + } -struct TimestampLess { - bool operator()(const BucketEntry &bucketEntry, Timestamp t) - { return bucketEntry.entry->getTimestamp() < t; } - bool operator()(Timestamp t, const BucketEntry &bucketEntry) - { return t < bucketEntry.entry->getTimestamp(); } -}; + bool operator()(Timestamp t, const BucketEntry &bucketEntry) { + return t < bucketEntry.entry->getTimestamp(); + } + }; -} // namespace + } // namespace -bool -BucketContent::hasTimestamp(Timestamp t) const -{ - if (!_entries.empty() && _entries.back().entry->getTimestamp() < t) { - return false; + bool + BucketContent::hasTimestamp(Timestamp t) const { + if (!_entries.empty() && _entries.back().entry->getTimestamp() < t) { + return false; + } + return binary_search(_entries.begin(), _entries.end(), t, TimestampLess()); } - return binary_search(_entries.begin(), _entries.end(), t, TimestampLess()); -} /** * GID map semantics: @@ -147,181 +146,174 @@ BucketContent::hasTimestamp(Timestamp t) const * document), we can remove the mapping entirely. */ -void -BucketContent::insert(DocEntry::SP e) -{ - LOG(spam, "insert(%s)", e->toString().c_str()); - const DocumentId* docId(e->getDocumentId()); - assert(docId != 0); - GlobalId gid(docId->getGlobalId()); - GidMapType::iterator gidIt(_gidMap.find(gid)); - - if (!_entries.empty() && - _entries.back().entry->getTimestamp() < e->getTimestamp()) { - _entries.push_back(BucketEntry(e, gid)); - } else { - std::vector::iterator it = - lower_bound(_entries.begin(), - _entries.end(), - e->getTimestamp(), - TimestampLess()); - if (it != _entries.end()) { - if (it->entry->getTimestamp() == e->getTimestamp()) { - if (*it->entry.get() == *e) { - LOG(debug, "Ignoring duplicate put entry %s", - e->toString().c_str()); - return; - } else { - LOG(error, "Entry %s was already present." - "Was trying to insert %s.", - it->entry->toString().c_str(), - e->toString().c_str()); - LOG_ABORT("should not reach here"); + void + BucketContent::insert(DocEntry::SP e) { + LOG(spam, "insert(%s)", e->toString().c_str()); + const DocumentId *docId(e->getDocumentId()); + assert(docId != 0); + GlobalId gid(docId->getGlobalId()); + GidMapType::iterator gidIt(_gidMap.find(gid)); + + if (!_entries.empty() && + _entries.back().entry->getTimestamp() < e->getTimestamp()) { + _entries.push_back(BucketEntry(e, gid)); + } else { + std::vector::iterator it = + lower_bound(_entries.begin(), + _entries.end(), + e->getTimestamp(), + TimestampLess()); + if (it != _entries.end()) { + if (it->entry->getTimestamp() == e->getTimestamp()) { + if (*it->entry.get() == *e) { + LOG(debug, "Ignoring duplicate put entry %s", + e->toString().c_str()); + return; + } else { + LOG(error, "Entry %s was already present." + "Was trying to insert %s.", + it->entry->toString().c_str(), + e->toString().c_str()); + LOG_ABORT("should not reach here"); + } } } + _entries.insert(it, BucketEntry(e, gid)); } - _entries.insert(it, BucketEntry(e, gid)); - } - // GID map points to newest entry for that particular GID - if (gidIt != _gidMap.end()) { - if (gidIt->second->getTimestamp() < e->getTimestamp()) { - // TODO(vekterli): add support for cheap info updates for putting - // newer versions of a document etc. by XORing away old checksum. - gidIt->second = e; - } else { - LOG(spam, - "Newly inserted entry %s was older than existing entry %s; " - "not updating GID mapping", - e->toString().c_str(), - gidIt->second->toString().c_str()); - } - _outdatedInfo = true; - } else { - _gidMap.insert(GidMapType::value_type(gid, e)); - // Since GID didn't exist before, it means we can do a running - // update of the bucket info. Bucket checksum is XOR of all entry - // checksums, which is commutative. - // Only bother to update if we don't have to re-do it all afterwards - // anyway. - // Updating bucketinfo before we update entries since we assume rest - // of function is nothrow. - if (!_outdatedInfo) { - if (!e->isRemove()) { - _info = BucketInfo(updateRollingChecksum( - computeEntryChecksum(BucketEntry(e, gid))), - _info.getDocumentCount() + 1, - _info.getDocumentSize() + e->getSize(), - _info.getEntryCount() + 1, - _info.getUsedSize() + e->getSize(), - _info.getReady(), - _info.getActive()); + // GID map points to newest entry for that particular GID + if (gidIt != _gidMap.end()) { + if (gidIt->second->getTimestamp() < e->getTimestamp()) { + // TODO(vekterli): add support for cheap info updates for putting + // newer versions of a document etc. by XORing away old checksum. + gidIt->second = e; } else { - _info = BucketInfo(_info.getChecksum(), - _info.getDocumentCount(), - _info.getDocumentSize(), - _info.getEntryCount() + 1, - _info.getUsedSize() + e->getSize(), - _info.getReady(), - _info.getActive()); + LOG(spam, + "Newly inserted entry %s was older than existing entry %s; " + "not updating GID mapping", + e->toString().c_str(), + gidIt->second->toString().c_str()); } + _outdatedInfo = true; + } else { + _gidMap.insert(GidMapType::value_type(gid, e)); + // Since GID didn't exist before, it means we can do a running + // update of the bucket info. Bucket checksum is XOR of all entry + // checksums, which is commutative. + // Only bother to update if we don't have to re-do it all afterwards + // anyway. + // Updating bucketinfo before we update entries since we assume rest + // of function is nothrow. + if (!_outdatedInfo) { + if (!e->isRemove()) { + _info = BucketInfo(updateRollingChecksum( + computeEntryChecksum(BucketEntry(e, gid))), + _info.getDocumentCount() + 1, + _info.getDocumentSize() + e->getSize(), + _info.getEntryCount() + 1, + _info.getUsedSize() + e->getSize(), + _info.getReady(), + _info.getActive()); + } else { + _info = BucketInfo(_info.getChecksum(), + _info.getDocumentCount(), + _info.getDocumentSize(), + _info.getEntryCount() + 1, + _info.getUsedSize() + e->getSize(), + _info.getReady(), + _info.getActive()); + } - LOG(spam, - "After cheap bucketinfo update, state is %s (inserted %s)", - _info.toString().c_str(), - e->toString().c_str()); + LOG(spam, + "After cheap bucketinfo update, state is %s (inserted %s)", + _info.toString().c_str(), + e->toString().c_str()); + } } - } - - assert(_outdatedInfo || _info.getEntryCount() == _entries.size()); -} -DocEntry::SP -BucketContent::getEntry(const DocumentId& did) const -{ - GidMapType::const_iterator it(_gidMap.find(did.getGlobalId())); - if (it != _gidMap.end()) { - return it->second; + assert(_outdatedInfo || _info.getEntryCount() == _entries.size()); } - return DocEntry::SP(); -} - -DocEntry::SP -BucketContent::getEntry(Timestamp t) const -{ - std::vector::const_iterator iter = - lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); - if (iter == _entries.end() || iter->entry->getTimestamp() != t) { + DocEntry::SP + BucketContent::getEntry(const DocumentId &did) const { + GidMapType::const_iterator it(_gidMap.find(did.getGlobalId())); + if (it != _gidMap.end()) { + return it->second; + } return DocEntry::SP(); - } else { - return iter->entry; } -} -void -BucketContent::eraseEntry(Timestamp t) -{ - std::vector::iterator iter = - lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); - - if (iter != _entries.end() && iter->entry->getTimestamp() == t) { - assert(iter->entry->getDocumentId() != 0); - GidMapType::iterator gidIt( - _gidMap.find(iter->entry->getDocumentId()->getGlobalId())); - assert(gidIt != _gidMap.end()); - _entries.erase(iter); - if (gidIt->second->getTimestamp() == t) { - LOG(debug, "erasing timestamp %" PRIu64 " from GID map", t.getValue()); - // TODO(vekterli): O(1) bucket info update for this case - // FIXME: is this correct? seems like it could cause wrong behavior! - _gidMap.erase(gidIt); - } // else: not erasing newest entry, cannot erase from GID map - _outdatedInfo = true; - } -} - -DummyPersistence::DummyPersistence(const std::shared_ptr& repo) - : _initialized(false), - _repo(repo), - _content(), - _nextIterator(1), - _iterators(), - _monitor(), - _clusterState() -{} + DocEntry::SP + BucketContent::getEntry(Timestamp t) const { + std::vector::const_iterator iter = + lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); -DummyPersistence::~DummyPersistence() = default; + if (iter == _entries.end() || iter->entry->getTimestamp() != t) { + return DocEntry::SP(); + } else { + return iter->entry; + } + } -document::select::Node::UP -DummyPersistence::parseDocumentSelection(const string& documentSelection, bool allowLeaf) -{ - document::select::Node::UP ret; - try { - document::select::Parser parser(*_repo, document::BucketIdFactory()); - ret = parser.parse(documentSelection); - } catch (document::select::ParsingFailedException& e) { - return document::select::Node::UP(); + void + BucketContent::eraseEntry(Timestamp t) { + std::vector::iterator iter = + lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); + + if (iter != _entries.end() && iter->entry->getTimestamp() == t) { + assert(iter->entry->getDocumentId() != 0); + GidMapType::iterator gidIt( + _gidMap.find(iter->entry->getDocumentId()->getGlobalId())); + assert(gidIt != _gidMap.end()); + _entries.erase(iter); + if (gidIt->second->getTimestamp() == t) { + LOG(debug, "erasing timestamp %" PRIu64 " from GID map", t.getValue()); + // TODO(vekterli): O(1) bucket info update for this case + // FIXME: is this correct? seems like it could cause wrong behavior! + _gidMap.erase(gidIt); + } // else: not erasing newest entry, cannot erase from GID map + _outdatedInfo = true; + } } - if (ret->isLeafNode() && !allowLeaf) { - return document::select::Node::UP(); + + DummyPersistence::DummyPersistence(const std::shared_ptr &repo) + : _initialized(false), + _repo(repo), + _content(), + _nextIterator(1), + _iterators(), + _monitor(), + _clusterState() {} + + DummyPersistence::~DummyPersistence() = default; + + document::select::Node::UP + DummyPersistence::parseDocumentSelection(const string &documentSelection, bool allowLeaf) { + document::select::Node::UP ret; + try { + document::select::Parser parser(*_repo, document::BucketIdFactory()); + ret = parser.parse(documentSelection); + } catch (document::select::ParsingFailedException &e) { + return document::select::Node::UP(); + } + if (ret->isLeafNode() && !allowLeaf) { + return document::select::Node::UP(); + } + return ret; } - return ret; -} -Result -DummyPersistence::initialize() -{ - assert(!_initialized); - _initialized = true; - return Result(); -} + Result + DummyPersistence::initialize() { + assert(!_initialized); + _initialized = true; + return Result(); + } #define DUMMYPERSISTENCE_VERIFY_INITIALIZED \ - if (!_initialized) throw vespalib::IllegalStateException( \ - "initialize() must always be called first in order to " \ - "trigger lazy initialization.", VESPA_STRLOC) - + if (!_initialized) { \ + LOG(error, "initialize() must always be called first in order to trigger lazy initialization."); \ + abort(); \ + } BucketIdListResult DummyPersistence::listBuckets(BucketSpace bucketSpace) const @@ -715,7 +707,7 @@ DummyPersistence::destroyIterator(IteratorId id, Context&) } void -DummyPersistence::createBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) +DummyPersistence::createBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "createBucket(%s)", b.toString().c_str()); @@ -731,7 +723,7 @@ DummyPersistence::createBucketAsync(const Bucket& b, Context&, OperationComplete } void -DummyPersistence::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) +DummyPersistence::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "deleteBucket(%s)", b.toString().c_str()); diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index 2ab97b0b403..a25bf6b8a8e 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -168,8 +168,8 @@ public: IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override; Result destroyIterator(IteratorId, Context&) override; - void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; - void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; + void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override; + void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override; Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; diff --git a/persistence/src/vespa/persistence/spi/catchresult.h b/persistence/src/vespa/persistence/spi/catchresult.h index 02c626ea23e..7b04498205d 100644 --- a/persistence/src/vespa/persistence/spi/catchresult.h +++ b/persistence/src/vespa/persistence/spi/catchresult.h @@ -19,4 +19,9 @@ private: const ResultHandler *_resulthandler; }; +class NoopOperationComplete : public OperationComplete { + void onComplete(std::unique_ptr) noexcept override { } + void addResultHandler(const spi::ResultHandler *) override { } +}; + } diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index 09a752d4ded..269175f7d26 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -337,14 +337,14 @@ struct PersistenceProvider * Tells the provider that the given bucket has been created in the * service layer. There is no requirement to do anything here. */ - virtual void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) = 0; + virtual void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept = 0; /** * Deletes the given bucket and all entries contained in that bucket. * After this operation has succeeded, a restart of the provider should * not yield the bucket in getBucketList(). */ - virtual void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) = 0; + virtual void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept = 0; /** * This function is called continuously by the service layer. It allows the diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 952c2218140..2e1fc74037c 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -549,7 +549,7 @@ PersistenceEngine::destroyIterator(IteratorId id, Context&) void -PersistenceEngine::createBucketAsync(const Bucket &b, Context &, OperationComplete::UP onComplete) +PersistenceEngine::createBucketAsync(const Bucket &b, Context &, OperationComplete::UP onComplete) noexcept { ReadGuard rguard(_rwMutex); LOG(spam, "createBucket(%s)", b.toString().c_str()); @@ -569,7 +569,7 @@ PersistenceEngine::createBucketAsync(const Bucket &b, Context &, OperationComple void -PersistenceEngine::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) +PersistenceEngine::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept { ReadGuard rguard(_rwMutex); LOG(spam, "deleteBucket(%s)", b.toString().c_str()); diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index 9cabebc8135..fe564d01459 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -114,8 +114,8 @@ public: IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override; Result destroyIterator(IteratorId, Context&) override; - void createBucketAsync(const Bucket &bucketId, Context &, OperationComplete::UP) override; - void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; + void createBucketAsync(const Bucket &bucketId, Context &, OperationComplete::UP) noexcept override; + void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override; BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index 302e603a9d8..02b43a32df3 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -24,7 +24,7 @@ #define CHECK_ERROR_ASYNC(className, failType, onError) \ { \ - Guard guard(_lock); \ + Guard guard(_lock); \ if (_result.getErrorCode() != spi::Result::ErrorType::NONE && (_failureMask & (failType))) { \ onError->onComplete(std::make_unique(_result.getErrorCode(), _result.getErrorMessage())); \ return; \ @@ -81,7 +81,7 @@ PersistenceProviderWrapper::listBuckets(BucketSpace bucketSpace) const } void -PersistenceProviderWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) +PersistenceProviderWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept { LOG_SPI("createBucket(" << bucket << ")"); CHECK_ERROR_ASYNC(spi::Result, FAIL_CREATE_BUCKET, onComplete); @@ -177,7 +177,7 @@ PersistenceProviderWrapper::destroyIterator(spi::IteratorId iterId, void PersistenceProviderWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, - spi::OperationComplete::UP operationComplete) + spi::OperationComplete::UP operationComplete) noexcept { LOG_SPI("deleteBucket(" << bucket << ")"); CHECK_ERROR_ASYNC(spi::Result, FAIL_DELETE_BUCKET, operationComplete); diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index 3cb7b92356b..cfc7002a643 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -96,7 +96,7 @@ public: void setActiveStateAsync(const spi::Bucket &bucket, spi::BucketInfo::ActiveState state, spi::OperationComplete::UP up) override; - void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; + void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override; spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&, spi::OperationComplete::UP) override; @@ -111,7 +111,7 @@ public: spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; - void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; + void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index 75d9b595c4f..a3f0182ba30 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -62,13 +62,13 @@ public: return PersistenceProviderWrapper::getBucketInfo(bucket); } - void createBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override { + void createBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) noexcept override { ++_createBucketInvocations; PersistenceProviderWrapper::createBucketAsync(bucket, ctx, std::move(onComplete)); } void - deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override { + deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) noexcept override { ++_deleteBucketInvocations; PersistenceProviderWrapper::deleteBucketAsync(bucket, ctx, std::move(onComplete)); } diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 017b8ce2b92..75e85fb4b6f 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -872,7 +872,6 @@ TEST_P(MergeHandlerTest, merge_bucket_spi_failures) { setUpChain(MIDDLE); ExpectedExceptionSpec exceptions[] = { - { PersistenceProviderWrapper::FAIL_CREATE_BUCKET, "create bucket" }, { PersistenceProviderWrapper::FAIL_BUCKET_INFO, "get bucket info" }, { PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" }, { PersistenceProviderWrapper::FAIL_ITERATE, "iterate" }, @@ -903,7 +902,6 @@ TEST_P(MergeHandlerTest, get_bucket_diff_spi_failures) { setUpChain(MIDDLE); ExpectedExceptionSpec exceptions[] = { - { PersistenceProviderWrapper::FAIL_CREATE_BUCKET, "create bucket" }, { PersistenceProviderWrapper::FAIL_BUCKET_INFO, "get bucket info" }, { PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" }, { PersistenceProviderWrapper::FAIL_ITERATE, "iterate" }, diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 5da2b65b7ff..bc6e67578c0 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -5,6 +5,7 @@ #include "testandsethelper.h" #include "bucketownershipnotifier.h" #include +#include #include #include #include @@ -91,11 +92,6 @@ private: vespalib::ISequencedTaskExecutor::ExecutorId _executorId; }; -struct Noop : public spi::OperationComplete { - void onComplete(std::unique_ptr) override { } - void addResultHandler(const spi::ResultHandler *) override { } -}; - bool bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) { // Don't check document sizes, as background moving of documents in Proton @@ -174,7 +170,7 @@ AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker:: }); if (cmd.getActive()) { - _spi.createBucketAsync(bucket, tracker->context(), std::make_unique()); + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique()); _spi.setActiveStateAsync(bucket, spi::BucketInfo::ACTIVE, std::make_unique(_sequencedExecutor, bucket, std::move(task))); } else { _spi.createBucketAsync(bucket, tracker->context(), std::make_unique(_sequencedExecutor, bucket, std::move(task))); diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 7727a591af7..34466e527a0 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -6,6 +6,7 @@ #include "apply_bucket_diff_state.h" #include #include +#include #include #include #include @@ -682,16 +683,6 @@ findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHa } } -struct CheckResult : public spi::OperationComplete { - spi::Bucket _bucket; - const char *_msg; - CheckResult(spi::Bucket bucket, const char * msg) : _bucket(bucket), _msg(msg) { } - void onComplete(std::unique_ptr result) override { - checkResult(*result, _bucket, _msg); - } - void addResultHandler(const spi::ResultHandler *) override { } -}; - } api::StorageReply::SP @@ -910,7 +901,7 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP tracker->fail(api::ReturnCode::BUSY, err); return tracker; } - _spi.createBucketAsync(bucket, tracker->context(), std::make_unique(bucket, "create bucket")); + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique()); MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); @@ -1073,7 +1064,7 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker tracker->setMetric(_env._metrics.getBucketDiff); spi::Bucket bucket(cmd.getBucket()); LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str()); - _spi.createBucketAsync(bucket, tracker->context(), std::make_unique(bucket, "create bucket")); + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique()); return handleGetBucketDiffStage2(cmd, std::move(tracker)); } diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index 5c27d112a7c..9ccd901744b 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -100,14 +100,14 @@ ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId, spi::Context& } void -ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) +ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept { onComplete->addResultHandler(this); _impl.deleteBucketAsync(bucket, context, std::move(onComplete)); } void -ProviderErrorWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) +ProviderErrorWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept { onComplete->addResultHandler(this); _impl.createBucketAsync(bucket, context, std::move(onComplete)); diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index e999f33f2bd..14d20cf8a52 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -62,8 +62,8 @@ public: void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override; void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override; - void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; - void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; + void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; + void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; std::unique_ptr register_executor(std::shared_ptr executor) override; private: template -- cgit v1.2.3 From a0a8ae79243f77e032b91171097e8339e1f63c55 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 25 Oct 2021 12:08:32 +0000 Subject: GC unused code --- storage/src/vespa/storage/persistence/mergehandler.cpp | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 34466e527a0..4cd0b181155 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -7,7 +7,6 @@ #include #include #include -#include #include #include #include @@ -51,20 +50,6 @@ constexpr int getDeleteFlag() { return 2; } -/** - * Throws std::runtime_error if result has an error. - */ -void -checkResult(const spi::Result& result, const spi::Bucket& bucket, const char* op) -{ - if (result.hasError()) { - vespalib::asciistream ss; - ss << "Failed " << op << " in " << bucket << ": " << result.toString(); - throw std::runtime_error(ss.str()); - } -} - - class IteratorGuard { spi::PersistenceProvider& _spi; spi::IteratorId _iteratorId; -- cgit v1.2.3 From 90676b68765027c1df6c4a3f22530a45118b4f3c Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 25 Oct 2021 12:42:40 +0000 Subject: Undo auto format --- .../persistence/dummyimpl/dummypersistence.cpp | 447 ++++++++++----------- 1 file changed, 217 insertions(+), 230 deletions(-) diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 944217796ab..74fef13f141 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -27,108 +27,109 @@ using document::FixedBucketSpaces; namespace storage::spi::dummy { - BucketContent::BucketContent() - : _entries(), - _gidMap(), - _info(), - _inUse(false), - _outdatedInfo(true), - _active(false) {} - - BucketContent::~BucketContent() = default; - - uint32_t - BucketContent::computeEntryChecksum(const BucketEntry &e) const { - vespalib::crc_32_type checksummer; - - uint64_t ts(e.entry->getTimestamp()); - checksummer.process_bytes(&e.gid, sizeof(GlobalId)); - checksummer.process_bytes(&ts, sizeof(uint64_t)); - return checksummer.checksum(); - } - - BucketChecksum - BucketContent::updateRollingChecksum(uint32_t entryChecksum) { - uint32_t checksum = _info.getChecksum(); - checksum ^= entryChecksum; - if (checksum == 0) { - checksum = 1; - } - return BucketChecksum(checksum); +BucketContent::BucketContent() + : _entries(), + _gidMap(), + _info(), + _inUse(false), + _outdatedInfo(true), + _active(false) +{} + +BucketContent::~BucketContent() = default; + +uint32_t +BucketContent::computeEntryChecksum(const BucketEntry &e) const { + vespalib::crc_32_type checksummer; + + uint64_t ts(e.entry->getTimestamp()); + checksummer.process_bytes(&e.gid, sizeof(GlobalId)); + checksummer.process_bytes(&ts, sizeof(uint64_t)); + return checksummer.checksum(); +} + +BucketChecksum +BucketContent::updateRollingChecksum(uint32_t entryChecksum) { + uint32_t checksum = _info.getChecksum(); + checksum ^= entryChecksum; + if (checksum == 0) { + checksum = 1; } + return BucketChecksum(checksum); +} - const BucketInfo & - BucketContent::getBucketInfo() const { - if (!_outdatedInfo) { - return _info; - } +const BucketInfo & +BucketContent::getBucketInfo() const { + if (!_outdatedInfo) { + return _info; + } - // Checksum should only depend on the newest entry for each document that - // has not been removed. - uint32_t unique = 0; - uint32_t uniqueSize = 0; - uint32_t totalSize = 0; - uint32_t checksum = 0; + // Checksum should only depend on the newest entry for each document that + // has not been removed. + uint32_t unique = 0; + uint32_t uniqueSize = 0; + uint32_t totalSize = 0; + uint32_t checksum = 0; - for (const BucketEntry &bucketEntry: _entries) { - const DocEntry &entry(*bucketEntry.entry); - const GlobalId &gid(bucketEntry.gid); + for (const BucketEntry &bucketEntry: _entries) { + const DocEntry &entry(*bucketEntry.entry); + const GlobalId &gid(bucketEntry.gid); - GidMapType::const_iterator gidIt(_gidMap.find(gid)); - assert(gidIt != _gidMap.end()); + GidMapType::const_iterator gidIt(_gidMap.find(gid)); + assert(gidIt != _gidMap.end()); - totalSize += entry.getSize(); - if (entry.isRemove()) { - continue; - } - // Only include if we're newest entry for the particular GID - if (gidIt->second.get() != &entry) { - continue; - } - ++unique; - uniqueSize += entry.getSize(); - - checksum ^= computeEntryChecksum(bucketEntry); + totalSize += entry.getSize(); + if (entry.isRemove()) { + continue; } - if (!unique) { - checksum = 0; - } else if (checksum == 0) { - checksum = 1; + // Only include if we're newest entry for the particular GID + if (gidIt->second.get() != &entry) { + continue; } + ++unique; + uniqueSize += entry.getSize(); - _info = BucketInfo(BucketChecksum(checksum), - unique, - uniqueSize, - _entries.size(), - totalSize, - BucketInfo::READY, - _active ? BucketInfo::ACTIVE : BucketInfo::NOT_ACTIVE); - - _outdatedInfo = false; - return _info; + checksum ^= computeEntryChecksum(bucketEntry); + } + if (!unique) { + checksum = 0; + } else if (checksum == 0) { + checksum = 1; } - namespace { + _info = BucketInfo(BucketChecksum(checksum), + unique, + uniqueSize, + _entries.size(), + totalSize, + BucketInfo::READY, + _active ? BucketInfo::ACTIVE : BucketInfo::NOT_ACTIVE); - struct TimestampLess { - bool operator()(const BucketEntry &bucketEntry, Timestamp t) { - return bucketEntry.entry->getTimestamp() < t; - } + _outdatedInfo = false; + return _info; +} - bool operator()(Timestamp t, const BucketEntry &bucketEntry) { - return t < bucketEntry.entry->getTimestamp(); - } - }; +namespace { - } // namespace +struct TimestampLess { + bool operator()(const BucketEntry &bucketEntry, Timestamp t) { + return bucketEntry.entry->getTimestamp() < t; + } - bool - BucketContent::hasTimestamp(Timestamp t) const { - if (!_entries.empty() && _entries.back().entry->getTimestamp() < t) { - return false; - } - return binary_search(_entries.begin(), _entries.end(), t, TimestampLess()); + bool operator()(Timestamp t, const BucketEntry &bucketEntry) { + return t < bucketEntry.entry->getTimestamp(); } +}; + +} // namespace + +bool +BucketContent::hasTimestamp(Timestamp t) const { + if (!_entries.empty() && _entries.back().entry->getTimestamp() < t) { + return false; + } + return binary_search(_entries.begin(), _entries.end(), t, TimestampLess()); +} /** * GID map semantics: @@ -146,168 +147,154 @@ namespace storage::spi::dummy { * document), we can remove the mapping entirely. */ - void - BucketContent::insert(DocEntry::SP e) { - LOG(spam, "insert(%s)", e->toString().c_str()); - const DocumentId *docId(e->getDocumentId()); - assert(docId != 0); - GlobalId gid(docId->getGlobalId()); - GidMapType::iterator gidIt(_gidMap.find(gid)); - - if (!_entries.empty() && - _entries.back().entry->getTimestamp() < e->getTimestamp()) { - _entries.push_back(BucketEntry(e, gid)); - } else { - std::vector::iterator it = - lower_bound(_entries.begin(), - _entries.end(), - e->getTimestamp(), - TimestampLess()); - if (it != _entries.end()) { - if (it->entry->getTimestamp() == e->getTimestamp()) { - if (*it->entry.get() == *e) { - LOG(debug, "Ignoring duplicate put entry %s", - e->toString().c_str()); - return; - } else { - LOG(error, "Entry %s was already present." - "Was trying to insert %s.", - it->entry->toString().c_str(), - e->toString().c_str()); - LOG_ABORT("should not reach here"); - } +void +BucketContent::insert(DocEntry::SP e) { + LOG(spam, "insert(%s)", e->toString().c_str()); + const DocumentId *docId(e->getDocumentId()); + assert(docId != 0); + GlobalId gid(docId->getGlobalId()); + GidMapType::iterator gidIt(_gidMap.find(gid)); + + if (!_entries.empty() && + _entries.back().entry->getTimestamp() < e->getTimestamp()) { + _entries.push_back(BucketEntry(e, gid)); + } else { + auto it = lower_bound(_entries.begin(), _entries.end(), e->getTimestamp(), TimestampLess()); + if (it != _entries.end()) { + if (it->entry->getTimestamp() == e->getTimestamp()) { + if (*it->entry.get() == *e) { + LOG(debug, "Ignoring duplicate put entry %s", e->toString().c_str()); + return; + } else { + LOG(error, "Entry %s was already present. Was trying to insert %s.", + it->entry->toString().c_str(), e->toString().c_str()); + LOG_ABORT("should not reach here"); } } - _entries.insert(it, BucketEntry(e, gid)); } + _entries.insert(it, BucketEntry(e, gid)); + } - // GID map points to newest entry for that particular GID - if (gidIt != _gidMap.end()) { - if (gidIt->second->getTimestamp() < e->getTimestamp()) { - // TODO(vekterli): add support for cheap info updates for putting - // newer versions of a document etc. by XORing away old checksum. - gidIt->second = e; + // GID map points to newest entry for that particular GID + if (gidIt != _gidMap.end()) { + if (gidIt->second->getTimestamp() < e->getTimestamp()) { + // TODO(vekterli): add support for cheap info updates for putting + // newer versions of a document etc. by XORing away old checksum. + gidIt->second = e; + } else { + LOG(spam, "Newly inserted entry %s was older than existing entry %s; not updating GID mapping", + e->toString().c_str(), gidIt->second->toString().c_str()); + } + _outdatedInfo = true; + } else { + _gidMap.insert(GidMapType::value_type(gid, e)); + // Since GID didn't exist before, it means we can do a running + // update of the bucket info. Bucket checksum is XOR of all entry + // checksums, which is commutative. + // Only bother to update if we don't have to re-do it all afterwards + // anyway. + // Updating bucketinfo before we update entries since we assume rest + // of function is nothrow. + if (!_outdatedInfo) { + if (!e->isRemove()) { + _info = BucketInfo(updateRollingChecksum( + computeEntryChecksum(BucketEntry(e, gid))), + _info.getDocumentCount() + 1, + _info.getDocumentSize() + e->getSize(), + _info.getEntryCount() + 1, + _info.getUsedSize() + e->getSize(), + _info.getReady(), + _info.getActive()); } else { - LOG(spam, - "Newly inserted entry %s was older than existing entry %s; " - "not updating GID mapping", - e->toString().c_str(), - gidIt->second->toString().c_str()); + _info = BucketInfo(_info.getChecksum(), + _info.getDocumentCount(), + _info.getDocumentSize(), + _info.getEntryCount() + 1, + _info.getUsedSize() + e->getSize(), + _info.getReady(), + _info.getActive()); } - _outdatedInfo = true; - } else { - _gidMap.insert(GidMapType::value_type(gid, e)); - // Since GID didn't exist before, it means we can do a running - // update of the bucket info. Bucket checksum is XOR of all entry - // checksums, which is commutative. - // Only bother to update if we don't have to re-do it all afterwards - // anyway. - // Updating bucketinfo before we update entries since we assume rest - // of function is nothrow. - if (!_outdatedInfo) { - if (!e->isRemove()) { - _info = BucketInfo(updateRollingChecksum( - computeEntryChecksum(BucketEntry(e, gid))), - _info.getDocumentCount() + 1, - _info.getDocumentSize() + e->getSize(), - _info.getEntryCount() + 1, - _info.getUsedSize() + e->getSize(), - _info.getReady(), - _info.getActive()); - } else { - _info = BucketInfo(_info.getChecksum(), - _info.getDocumentCount(), - _info.getDocumentSize(), - _info.getEntryCount() + 1, - _info.getUsedSize() + e->getSize(), - _info.getReady(), - _info.getActive()); - } - LOG(spam, - "After cheap bucketinfo update, state is %s (inserted %s)", - _info.toString().c_str(), - e->toString().c_str()); - } + LOG(spam, "After cheap bucketinfo update, state is %s (inserted %s)", + _info.toString().c_str(), e->toString().c_str()); } - - assert(_outdatedInfo || _info.getEntryCount() == _entries.size()); } - DocEntry::SP - BucketContent::getEntry(const DocumentId &did) const { - GidMapType::const_iterator it(_gidMap.find(did.getGlobalId())); - if (it != _gidMap.end()) { - return it->second; - } - return DocEntry::SP(); + assert(_outdatedInfo || _info.getEntryCount() == _entries.size()); +} + +DocEntry::SP +BucketContent::getEntry(const DocumentId &did) const { + auto it(_gidMap.find(did.getGlobalId())); + if (it != _gidMap.end()) { + return it->second; } + return DocEntry::SP(); +} - DocEntry::SP - BucketContent::getEntry(Timestamp t) const { - std::vector::const_iterator iter = - lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); +DocEntry::SP +BucketContent::getEntry(Timestamp t) const { + auto iter = lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); - if (iter == _entries.end() || iter->entry->getTimestamp() != t) { - return DocEntry::SP(); - } else { - return iter->entry; - } + if (iter == _entries.end() || iter->entry->getTimestamp() != t) { + return DocEntry::SP(); + } else { + return iter->entry; } +} - void - BucketContent::eraseEntry(Timestamp t) { - std::vector::iterator iter = - lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); - - if (iter != _entries.end() && iter->entry->getTimestamp() == t) { - assert(iter->entry->getDocumentId() != 0); - GidMapType::iterator gidIt( - _gidMap.find(iter->entry->getDocumentId()->getGlobalId())); - assert(gidIt != _gidMap.end()); - _entries.erase(iter); - if (gidIt->second->getTimestamp() == t) { - LOG(debug, "erasing timestamp %" PRIu64 " from GID map", t.getValue()); - // TODO(vekterli): O(1) bucket info update for this case - // FIXME: is this correct? seems like it could cause wrong behavior! - _gidMap.erase(gidIt); - } // else: not erasing newest entry, cannot erase from GID map - _outdatedInfo = true; - } +void +BucketContent::eraseEntry(Timestamp t) { + auto iter = lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); + + if (iter != _entries.end() && iter->entry->getTimestamp() == t) { + assert(iter->entry->getDocumentId() != 0); + GidMapType::iterator gidIt = _gidMap.find(iter->entry->getDocumentId()->getGlobalId()); + assert(gidIt != _gidMap.end()); + _entries.erase(iter); + if (gidIt->second->getTimestamp() == t) { + LOG(debug, "erasing timestamp %" PRIu64 " from GID map", t.getValue()); + // TODO(vekterli): O(1) bucket info update for this case + // FIXME: is this correct? seems like it could cause wrong behavior! + _gidMap.erase(gidIt); + } // else: not erasing newest entry, cannot erase from GID map + _outdatedInfo = true; } +} - DummyPersistence::DummyPersistence(const std::shared_ptr &repo) - : _initialized(false), - _repo(repo), - _content(), - _nextIterator(1), - _iterators(), - _monitor(), - _clusterState() {} - - DummyPersistence::~DummyPersistence() = default; - - document::select::Node::UP - DummyPersistence::parseDocumentSelection(const string &documentSelection, bool allowLeaf) { - document::select::Node::UP ret; - try { - document::select::Parser parser(*_repo, document::BucketIdFactory()); - ret = parser.parse(documentSelection); - } catch (document::select::ParsingFailedException &e) { - return document::select::Node::UP(); - } - if (ret->isLeafNode() && !allowLeaf) { - return document::select::Node::UP(); - } - return ret; - } +DummyPersistence::DummyPersistence(const std::shared_ptr &repo) + : _initialized(false), + _repo(repo), + _content(), + _nextIterator(1), + _iterators(), + _monitor(), + _clusterState() +{} + +DummyPersistence::~DummyPersistence() = default; + +document::select::Node::UP +DummyPersistence::parseDocumentSelection(const string &documentSelection, bool allowLeaf) { + document::select::Node::UP ret; + try { + document::select::Parser parser(*_repo, document::BucketIdFactory()); + ret = parser.parse(documentSelection); + } catch (document::select::ParsingFailedException &e) { + return document::select::Node::UP(); + } + if (ret->isLeafNode() && !allowLeaf) { + return document::select::Node::UP(); + } + return ret; +} - Result - DummyPersistence::initialize() { - assert(!_initialized); - _initialized = true; - return Result(); - } +Result +DummyPersistence::initialize() { + assert(!_initialized); + _initialized = true; + return Result(); +} #define DUMMYPERSISTENCE_VERIFY_INITIALIZED \ if (!_initialized) { \ -- cgit v1.2.3