From e59348551c200c15f5dc4158274d96b57b91227f Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Thu, 21 Oct 2021 16:54:10 +0000 Subject: Async createBucket --- .../persistence/dummyimpl/dummypersistence.cpp | 6 +- .../vespa/persistence/dummyimpl/dummypersistence.h | 2 +- .../persistence/spi/abstractpersistenceprovider.h | 1 - .../vespa/persistence/spi/persistenceprovider.cpp | 8 + .../vespa/persistence/spi/persistenceprovider.h | 3 +- .../proton/persistenceengine/persistenceengine.cpp | 18 +- .../proton/persistenceengine/persistenceengine.h | 2 +- .../common/persistenceproviderwrapper.cpp | 8 +- .../common/persistenceproviderwrapper.h | 2 +- .../filestorage/operationabortingtest.cpp | 4 +- .../src/vespa/storage/persistence/asynchandler.cpp | 28 +++ .../src/vespa/storage/persistence/asynchandler.h | 1 + .../src/vespa/storage/persistence/mergehandler.cpp | 265 +++++++++++---------- .../src/vespa/storage/persistence/mergehandler.h | 1 + .../storage/persistence/persistencehandler.cpp | 2 +- .../storage/persistence/provider_error_wrapper.cpp | 11 +- .../storage/persistence/provider_error_wrapper.h | 2 +- .../storage/persistence/simplemessagehandler.cpp | 16 -- .../storage/persistence/simplemessagehandler.h | 1 - 19 files changed, 206 insertions(+), 175 deletions(-) diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 6e4f38fe564..b8a390ed0ce 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -714,8 +714,8 @@ DummyPersistence::destroyIterator(IteratorId id, Context&) return Result(); } -Result -DummyPersistence::createBucket(const Bucket& b, Context&) +void +DummyPersistence::createBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "createBucket(%s)", b.toString().c_str()); @@ -727,7 +727,7 @@ DummyPersistence::createBucket(const Bucket& b, Context&) assert(!_content[b]->_inUse); LOG(debug, "%s already existed", b.toString().c_str()); } - return Result(); + onComplete->onComplete(std::make_unique()); } void diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index 99d6ba717b7..2ab97b0b403 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -168,7 +168,7 @@ public: IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override; Result destroyIterator(IteratorId, Context&) override; - Result createBucket(const Bucket&, Context&) override; + void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h index e287bdc5252..3b59f20ca96 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h @@ -15,7 +15,6 @@ class AbstractPersistenceProvider : public PersistenceProvider { public: Result initialize() override { return Result(); }; - Result createBucket(const Bucket&, Context&) override { return Result(); } Result removeEntry(const Bucket&, Timestamp, Context&) override { return Result(); } void removeIfFoundAsync(const Bucket&, Timestamp, const DocumentId&, Context&, OperationComplete::UP) override; Result setClusterState(BucketSpace, const ClusterState&) override { return Result(); } diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp index 3ea476c33fc..31db08a6f4f 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp @@ -16,6 +16,14 @@ PersistenceProvider::setActiveState(const Bucket& bucket, BucketInfo::ActiveStat return *future.get(); } +Result +PersistenceProvider::createBucket(const Bucket& bucket, Context& context) { + auto catcher = std::make_unique(); + auto future = catcher->future_result(); + createBucketAsync(bucket, context, std::move(catcher)); + return *future.get(); +} + Result PersistenceProvider::deleteBucket(const Bucket& bucket, Context& context) { auto catcher = std::make_unique(); diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index 83eb042d855..09a752d4ded 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -58,6 +58,7 @@ struct PersistenceProvider virtual ~PersistenceProvider(); // TODO Move to utility class for use in tests only + Result createBucket(const Bucket&, Context&); Result deleteBucket(const Bucket&, Context&); Result put(const Bucket&, Timestamp, DocumentSP, Context&); Result setActiveState(const Bucket&, BucketInfo::ActiveState); @@ -336,7 +337,7 @@ struct PersistenceProvider * Tells the provider that the given bucket has been created in the * service layer. There is no requirement to do anything here. */ - virtual Result createBucket(const Bucket&, Context&) = 0; + virtual void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) = 0; /** * Deletes the given bucket and all entries contained in that bucket. diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 114292d055d..952c2218140 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -548,19 +548,23 @@ PersistenceEngine::destroyIterator(IteratorId id, Context&) } -Result -PersistenceEngine::createBucket(const Bucket &b, Context &) +void +PersistenceEngine::createBucketAsync(const Bucket &b, Context &, OperationComplete::UP onComplete) { ReadGuard rguard(_rwMutex); LOG(spam, "createBucket(%s)", b.toString().c_str()); HandlerSnapshot snap = getHandlerSnapshot(rguard, b.getBucketSpace()); - TransportLatch latch(snap.size()); - for (; snap.handlers().valid(); snap.handlers().next()) { + + auto transportContext = std::make_shared(snap.size(), std::move(onComplete)); + while (snap.handlers().valid()) { IPersistenceHandler *handler = snap.handlers().get(); - handler->handleCreateBucket(feedtoken::make(latch), b); + snap.handlers().next(); + if (snap.handlers().valid()) { + handler->handleCreateBucket(feedtoken::make(transportContext), b); + } else { + handler->handleCreateBucket(feedtoken::make(std::move(transportContext)), b); + } } - latch.await(); - return latch.getResult(); } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index 94331ac2cd6..9cabebc8135 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -114,7 +114,7 @@ public: IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override; Result destroyIterator(IteratorId, Context&) override; - Result createBucket(const Bucket &bucketId, Context &) override ; + void createBucketAsync(const Bucket &bucketId, Context &, OperationComplete::UP) override; void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index b3bd1c6a253..302e603a9d8 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -80,12 +80,12 @@ PersistenceProviderWrapper::listBuckets(BucketSpace bucketSpace) const return _spi.listBuckets(bucketSpace); } -spi::Result -PersistenceProviderWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context) +void +PersistenceProviderWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) { LOG_SPI("createBucket(" << bucket << ")"); - CHECK_ERROR(spi::Result, FAIL_CREATE_BUCKET); - return _spi.createBucket(bucket, context); + CHECK_ERROR_ASYNC(spi::Result, FAIL_CREATE_BUCKET, onComplete); + return _spi.createBucketAsync(bucket, context, std::move(onComplete)); } spi::BucketInfoResult diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index c6628814dba..3cb7b92356b 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -96,7 +96,7 @@ public: void setActiveStateAsync(const spi::Bucket &bucket, spi::BucketInfo::ActiveState state, spi::OperationComplete::UP up) override; - spi::Result createBucket(const spi::Bucket&, spi::Context&) override; + void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override; spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&, spi::OperationComplete::UP) override; diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index 07d2b24d536..75d9b595c4f 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -62,9 +62,9 @@ public: return PersistenceProviderWrapper::getBucketInfo(bucket); } - spi::Result createBucket(const spi::Bucket& bucket, spi::Context& ctx) override { + void createBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override { ++_createBucketInvocations; - return PersistenceProviderWrapper::createBucket(bucket, ctx); + PersistenceProviderWrapper::createBucketAsync(bucket, ctx, std::move(onComplete)); } void diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 47b5e4f5f27..b3e9d596c76 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -153,6 +153,34 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons return trackerUP; } +MessageTracker::UP +AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.createBuckets); + LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); + if (_env._fileStorHandler.isMerging(cmd.getBucket())) { + LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); + } + spi::Bucket bucket(cmd.getBucket()); + auto task = makeResultTask([this, tracker = std::move(tracker), bucket, active=cmd.getActive()](spi::Result::UP ignored) mutable { + // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric + (void) ignored; + if (active) { + auto nestedTask = makeResultTask([tracker = std::move(tracker)](spi::Result::UP nestedIgnored) { + // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric + (void) nestedIgnored; + tracker->sendReply(); + }); + _spi.setActiveStateAsync(bucket, spi::BucketInfo::ACTIVE, std::make_unique(_sequencedExecutor, bucket, std::move(nestedTask))); + } else { + tracker->sendReply(); + } + }); + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique(_sequencedExecutor, bucket, std::move(task))); + + return tracker; +} + MessageTracker::UP AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const { diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index 4f5c242570c..db5a77bfb59 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -30,6 +30,7 @@ public: MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; static bool is_async_message(api::MessageType::Id type_id) noexcept; private: bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 77e7762ec9a..2f19b8695a4 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -12,7 +12,6 @@ #include #include #include -#include #include LOG_SETUP(".persistence.mergehandler"); @@ -663,24 +662,25 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const } namespace { - void findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask, - uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd) - { - for (const auto& entry : status.diff) { - uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask); - if ((entry_has_mask == 0u) || - (constrictHasMask && (entry_has_mask != hasMask))) { - continue; - } - cmd.getDiff().emplace_back(entry); - if (constrictHasMask) { - cmd.getDiff().back()._entry._hasMask = newHasMask; - } else { - cmd.getDiff().back()._entry._hasMask = entry_has_mask; - } +void +findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask, + uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd) +{ + for (const auto& entry : status.diff) { + uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask); + if ((entry_has_mask == 0u) || + (constrictHasMask && (entry_has_mask != hasMask))) { + continue; + } + cmd.getDiff().emplace_back(entry); + if (constrictHasMask) { + cmd.getDiff().back()._entry._hasMask = newHasMask; + } else { + cmd.getDiff().back()._entry._hasMask = entry_has_mask; } } } +} api::StorageReply::SP MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, @@ -938,141 +938,146 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP namespace { - uint8_t findOwnIndex( - const std::vector& nodeList, - uint16_t us) - { - for (uint32_t i=0, n=nodeList.size(); i& nodeList, + uint16_t us) +{ + for (uint32_t i=0, n=nodeList.size(); i - { - bool operator()(const api::GetBucketDiffCommand::Entry& x, - const api::GetBucketDiffCommand::Entry& y) const { - return (x._timestamp < y._timestamp); - } - }; - - /** - * Merges list A and list B together and puts the result in result. - * Result is swapped in as last step to keep function exception safe. Thus - * result can be listA or listB if wanted. - * - * listA and listB are assumed to be in the order found in the slotfile, or - * in the order given by a previous call to this function. (In both cases - * this will be sorted by timestamp) - * - * @return false if any suspect entries was found. - */ - bool mergeLists( - const std::vector& listA, - const std::vector& listB, - std::vector& finalResult) - { - bool suspect = false; - std::vector result; - uint32_t i = 0, j = 0; - while (i < listA.size() && j < listB.size()) { - const api::GetBucketDiffCommand::Entry& a(listA[i]); - const api::GetBucketDiffCommand::Entry& b(listB[j]); - if (a._timestamp < b._timestamp) { - result.push_back(a); - ++i; - } else if (a._timestamp > b._timestamp) { - result.push_back(b); - ++j; - } else { - // If we find equal timestamped entries that are not the - // same.. Flag an error. But there is nothing we can do - // about it. Note it as if it is the same entry so we - // dont try to merge them. - if (!(a == b)) { - if (a._gid == b._gid && a._flags == b._flags) { - if ((a._flags & getDeleteFlag()) != 0 && - (b._flags & getDeleteFlag()) != 0) - { - // Unfortunately this can happen, for instance - // if a remove comes to a bucket out of sync - // and reuses different headers in the two - // versions. - LOG(debug, "Found entries with equal timestamps of " - "the same gid who both are remove " - "entries: %s <-> %s.", - a.toString(true).c_str(), - b.toString(true).c_str()); - } else { - LOG(error, "Found entries with equal timestamps of " - "the same gid. This is likely same " - "document where size of document varies:" - " %s <-> %s.", - a.toString(true).c_str(), - b.toString(true).c_str()); - } - result.push_back(a); - result.back()._hasMask |= b._hasMask; - suspect = true; - } else if ((a._flags & getDeleteFlag()) - != (b._flags & getDeleteFlag())) +/** + * Merges list A and list B together and puts the result in result. + * Result is swapped in as last step to keep function exception safe. Thus + * result can be listA or listB if wanted. + * + * listA and listB are assumed to be in the order found in the slotfile, or + * in the order given by a previous call to this function. (In both cases + * this will be sorted by timestamp) + * + * @return false if any suspect entries was found. + */ +bool mergeLists( + const std::vector& listA, + const std::vector& listB, + std::vector& finalResult) +{ + bool suspect = false; + std::vector result; + uint32_t i = 0, j = 0; + while (i < listA.size() && j < listB.size()) { + const api::GetBucketDiffCommand::Entry& a(listA[i]); + const api::GetBucketDiffCommand::Entry& b(listB[j]); + if (a._timestamp < b._timestamp) { + result.push_back(a); + ++i; + } else if (a._timestamp > b._timestamp) { + result.push_back(b); + ++j; + } else { + // If we find equal timestamped entries that are not the + // same.. Flag an error. But there is nothing we can do + // about it. Note it as if it is the same entry so we + // dont try to merge them. + if (!(a == b)) { + if (a._gid == b._gid && a._flags == b._flags) { + if ((a._flags & getDeleteFlag()) != 0 && + (b._flags & getDeleteFlag()) != 0) { - // If we find one remove and one put entry on the - // same timestamp we are going to keep the remove - // entry to make the copies consistent. - const api::GetBucketDiffCommand::Entry& deletedEntry( - (a._flags & getDeleteFlag()) != 0 ? a : b); - result.push_back(deletedEntry); - LOG(debug, - "Found put and remove on same timestamp. Keeping" - "remove as it is likely caused by remove with " - "copies unavailable at the time: %s, %s.", - a.toString().c_str(), b.toString().c_str()); + // Unfortunately this can happen, for instance + // if a remove comes to a bucket out of sync + // and reuses different headers in the two + // versions. + LOG(debug, "Found entries with equal timestamps of " + "the same gid who both are remove " + "entries: %s <-> %s.", + a.toString(true).c_str(), + b.toString(true).c_str()); } else { - LOG(error, "Found entries with equal timestamps that " - "weren't the same entry: %s, %s.", - a.toString().c_str(), b.toString().c_str()); - result.push_back(a); - result.back()._hasMask |= b._hasMask; - suspect = true; + LOG(error, "Found entries with equal timestamps of " + "the same gid. This is likely same " + "document where size of document varies:" + " %s <-> %s.", + a.toString(true).c_str(), + b.toString(true).c_str()); } + result.push_back(a); + result.back()._hasMask |= b._hasMask; + suspect = true; + } else if ((a._flags & getDeleteFlag()) + != (b._flags & getDeleteFlag())) + { + // If we find one remove and one put entry on the + // same timestamp we are going to keep the remove + // entry to make the copies consistent. + const api::GetBucketDiffCommand::Entry& deletedEntry( + (a._flags & getDeleteFlag()) != 0 ? a : b); + result.push_back(deletedEntry); + LOG(debug, + "Found put and remove on same timestamp. Keeping" + "remove as it is likely caused by remove with " + "copies unavailable at the time: %s, %s.", + a.toString().c_str(), b.toString().c_str()); } else { + LOG(error, "Found entries with equal timestamps that " + "weren't the same entry: %s, %s.", + a.toString().c_str(), b.toString().c_str()); result.push_back(a); result.back()._hasMask |= b._hasMask; + suspect = true; } - ++i; - ++j; + } else { + result.push_back(a); + result.back()._hasMask |= b._hasMask; } + ++i; + ++j; } - if (i < listA.size()) { - assert(j >= listB.size()); - for (uint32_t n = listA.size(); i= listA.size()); - for (uint32_t n = listB.size(); j= listB.size()); + for (uint32_t n = listA.size(); i= listA.size()); + for (uint32_t n = listB.size(); j result) override { + checkResult(*result, _bucket, _msg); + } + void addResultHandler(const spi::ResultHandler *) override { } +}; } MessageTracker::UP -MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const -{ +MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.getBucketDiff); spi::Bucket bucket(cmd.getBucket()); LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str()); - checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket"); + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique(bucket, "create bucket")); + return handleGetBucketDiffStage2(cmd, std::move(tracker)); +} +MessageTracker::UP +MergeHandler::handleGetBucketDiffStage2(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const +{ + spi::Bucket bucket(cmd.getBucket()); if (_env._fileStorHandler.isMerging(bucket.getBucket())) { tracker->fail(api::ReturnCode::BUSY, "A merge is already running on this bucket."); return tracker; diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 17cfb847d2c..e6b4d047209 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -86,6 +86,7 @@ private: const uint32_t _commonMergeChainOptimalizationMinimumSize; std::atomic _async_apply_bucket_diff; + MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const; /** Returns a reply if merge is complete */ api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index aa1a9c136fd..761021a9612 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -44,7 +44,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr case api::MessageType::REVERT_ID: return _simpleHandler.handleRevert(static_cast(msg), std::move(tracker)); case api::MessageType::CREATEBUCKET_ID: - return _simpleHandler.handleCreateBucket(static_cast(msg), std::move(tracker)); + return _asyncHandler.handleCreateBucket(static_cast(msg), std::move(tracker)); case api::MessageType::DELETEBUCKET_ID: return _asyncHandler.handleDeleteBucket(static_cast(msg), std::move(tracker)); case api::MessageType::JOINBUCKETS_ID: diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index ce424f0ce83..5c27d112a7c 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -99,17 +99,18 @@ ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId, spi::Context& return checkResult(_impl.destroyIterator(iteratorId, context)); } -spi::Result -ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context) +void +ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) { - return checkResult(_impl.createBucket(bucket, context)); + onComplete->addResultHandler(this); + _impl.deleteBucketAsync(bucket, context, std::move(onComplete)); } void -ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) +ProviderErrorWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) { onComplete->addResultHandler(this); - _impl.deleteBucketAsync(bucket, context, std::move(onComplete)); + _impl.createBucketAsync(bucket, context, std::move(onComplete)); } spi::BucketIdListResult diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index c9d2411e372..e999f33f2bd 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -49,7 +49,6 @@ public: spi::Context &context) override; spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; - spi::Result createBucket(const spi::Bucket&, spi::Context&) override; spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override; @@ -63,6 +62,7 @@ public: void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override; void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override; + void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; std::unique_ptr register_executor(std::shared_ptr executor) override; private: diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp index b4fe207e2e5..9a7a451b906 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp @@ -87,22 +87,6 @@ SimpleMessageHandler::handleRevert(api::RevertCommand& cmd, MessageTracker::UP t return tracker; } -MessageTracker::UP -SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const -{ - tracker->setMetric(_env._metrics.createBuckets); - LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); - if (_env._fileStorHandler.isMerging(cmd.getBucket())) { - LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); - } - spi::Bucket spiBucket(cmd.getBucket()); - _spi.createBucket(spiBucket, tracker->context()); - if (cmd.getActive()) { - _spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE); - } - return tracker; -} - MessageTracker::UP SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const { diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h index 2cfbc7016c0..009fd6dff52 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.h +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h @@ -22,7 +22,6 @@ public: SimpleMessageHandler(const PersistenceUtil&, spi::PersistenceProvider&); MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const; - MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const; -- cgit v1.2.3