diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-11-17 18:31:29 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-11-17 18:31:29 +0000 |
commit | 8e5767e8d878d59c082ade3ef51f991a81338220 (patch) | |
tree | 5c86157e0449947fafd57c0a904018bc9173fcb5 | |
parent | ab3518e8b3a4caf742e12a134c4fb1d2bbf3c293 (diff) |
Move removeLocation over to Asynchandler and issue all removes for one bucket before waiting for the replies.
Prepare RemoveResult to contain more replies.
7 files changed, 82 insertions, 71 deletions
diff --git a/persistence/src/vespa/persistence/spi/result.h b/persistence/src/vespa/persistence/spi/result.h index 4ec323c6b47..73838e99a18 100644 --- a/persistence/src/vespa/persistence/spi/result.h +++ b/persistence/src/vespa/persistence/spi/result.h @@ -25,14 +25,15 @@ public: /** * Constructor to use for a result where there is no error. */ - Result() : _errorCode(ErrorType::NONE), _errorMessage() {} + Result() noexcept : _errorCode(ErrorType::NONE), _errorMessage() {} /** * Constructor to use when an error has been detected. */ - Result(ErrorType error, const vespalib::string& errorMessage) + Result(ErrorType error, const vespalib::string& errorMessage) noexcept : _errorCode(error), - _errorMessage(errorMessage) {} + _errorMessage(errorMessage) + {} Result(const Result &); Result & operator = (const Result &); @@ -130,21 +131,20 @@ public: * The service layer will not update the bucket information in this case, * so it should not be returned either. */ - RemoveResult(ErrorType error, const vespalib::string& errorMessage) + RemoveResult(ErrorType error, const vespalib::string& errorMessage) noexcept : Result(error, errorMessage), - _wasFound(false) + _numRemoved(0) { } - /** - * Constructor to use when the remove was successful. - */ - RemoveResult(bool foundDocument) - : _wasFound(foundDocument) { } - - bool wasFound() const { return _wasFound; } + explicit RemoveResult(bool found) noexcept + : RemoveResult(found ? 1u : 0u) { } + explicit RemoveResult(uint32_t numRemoved) noexcept + : _numRemoved(numRemoved) { } + bool wasFound() const { return _numRemoved > 0; } + uint32_t num_removed() const { return _numRemoved; } private: - bool _wasFound; + uint32_t _numRemoved; }; class GetResult : public Result { @@ -257,14 +257,14 @@ public: /** * Constructor used when there was an error creating the iterator. */ - CreateIteratorResult(ErrorType error, const vespalib::string& errorMessage) + CreateIteratorResult(ErrorType error, const vespalib::string& errorMessage) noexcept : Result(error, errorMessage), _iterator(0) { } /** * Constructor used when the iterator state was successfully created. */ - CreateIteratorResult(const IteratorId& id) + CreateIteratorResult(const IteratorId& id) noexcept : _iterator(id) { } @@ -299,8 +299,8 @@ public: { } IterateResult(const IterateResult &) = delete; - IterateResult(IterateResult &&rhs) = default; - IterateResult &operator=(IterateResult &&rhs) = default; + IterateResult(IterateResult &&rhs) noexcept = default; + IterateResult &operator=(IterateResult &&rhs) noexcept = default; ~IterateResult(); diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp index 80047a1797c..18242e62873 100644 --- a/storage/src/tests/persistence/processalltest.cpp +++ b/storage/src/tests/persistence/processalltest.cpp @@ -2,6 +2,7 @@ #include <vespa/document/base/testdocman.h> #include <vespa/storage/persistence/processallhandler.h> +#include <vespa/storage/persistence/asynchandler.h> #include <vespa/storage/persistence/messages.h> #include <tests/persistence/persistencetestutils.h> #include <vespa/document/test/make_document_bucket.h> @@ -36,7 +37,8 @@ TEST_F(ProcessAllHandlerTest, remove_location) { document::Bucket bucket = makeDocumentBucket(bucketId); auto cmd = std::make_shared<api::RemoveLocationCommand>("id.user == 4", bucket); - ProcessAllHandler handler(getEnv(), getPersistenceProvider()); + document::BucketIdFactory bucketIdFactory; + AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier, *_sequenceTaskExecutor, bucketIdFactory); auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)); EXPECT_EQ("DocEntry(1234, 1, id:mail:testdoctype1:n=4:3619.html)\n" @@ -50,7 +52,8 @@ TEST_F(ProcessAllHandlerTest, remove_location) { TEST_F(ProcessAllHandlerTest, remove_location_document_subset) { document::BucketId bucketId(16, 4); - ProcessAllHandler handler(getEnv(), getPersistenceProvider()); + document::BucketIdFactory bucketIdFactory; + AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier, *_sequenceTaskExecutor, bucketIdFactory); document::TestDocMan docMan; for (int i = 0; i < 10; ++i) { @@ -87,7 +90,8 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_unknown_doc_ty document::Bucket bucket = makeDocumentBucket(bucketId); auto cmd = std::make_shared<api::RemoveLocationCommand>("unknowndoctype.headerval % 2 == 0", bucket); - ProcessAllHandler handler(getEnv(), getPersistenceProvider()); + document::BucketIdFactory bucketIdFactory; + AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier, *_sequenceTaskExecutor, bucketIdFactory); ASSERT_THROW(handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)), std::exception); EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n", @@ -101,7 +105,8 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_bogus_selectio document::Bucket bucket = makeDocumentBucket(bucketId); auto cmd = std::make_shared<api::RemoveLocationCommand>("id.bogus != badgers", bucket); - ProcessAllHandler handler(getEnv(), getPersistenceProvider()); + document::BucketIdFactory bucketIdFactory; + AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier, *_sequenceTaskExecutor, bucketIdFactory); ASSERT_THROW(handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)), std::exception); EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n", diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index bc6e67578c0..aac2b0748c4 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -4,16 +4,21 @@ #include "persistenceutil.h" #include "testandsethelper.h" #include "bucketownershipnotifier.h" +#include "bucketprocessor.h" #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/persistence/spi/catchresult.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/document/update/documentupdate.h> +#include <vespa/document/fieldset/fieldsets.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/vespalib/util/destructor_callbacks.h> +#include <vespa/vespalib/util/count_down_latch.h> +#include <vespa/vespalib/util/stringfmt.h> #include <vespa/log/log.h> LOG_SETUP(".storage.persistence.asynchandler"); +using vespalib::make_string_short::fmt; namespace storage { namespace { @@ -103,6 +108,20 @@ bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount())); } +class UnrevertableRemoveEntryProcessor : public BucketProcessor::EntryProcessor { +public: + using DocumentIdsAndTimeStamps = std::vector<std::pair<spi::Timestamp, spi::DocumentId>>; + UnrevertableRemoveEntryProcessor(DocumentIdsAndTimeStamps & to_remove) + : _to_remove(to_remove) + {} + + void process(spi::DocEntry& entry) override { + _to_remove.emplace_back(entry.getTimestamp(), *entry.getDocumentId()); + } +private: + DocumentIdsAndTimeStamps & _to_remove; +}; + } AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider & spi, @@ -378,4 +397,38 @@ AsyncHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const ap return true; } +MessageTracker::UP +AsyncHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.removeLocation); + + LOG(debug, "RemoveLocation(%s): using selection '%s'", + cmd.getBucketId().toString().c_str(), + cmd.getDocumentSelection().c_str()); + + spi::Bucket bucket(cmd.getBucket()); + UnrevertableRemoveEntryProcessor::DocumentIdsAndTimeStamps to_remove; + UnrevertableRemoveEntryProcessor processor(to_remove); + BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(), + std::make_shared<document::DocIdOnly>(), + processor, spi::NEWEST_DOCUMENT_ONLY,tracker->context()); + + std::vector<std::future<std::unique_ptr<spi::Result>>> results; + results.reserve(to_remove.size()); + for (auto & entry : to_remove) { + auto catcher = std::make_unique<spi::CatchResult>(); + results.push_back(catcher->future_result()); + _spi.removeAsync(bucket, entry.first, entry.second, tracker->context(), std::move(catcher)); + } + for (auto & future : results) { + auto result = future.get(); + if (result->getErrorCode() != spi::Result::ErrorType::NONE) { + throw std::runtime_error(fmt("Failed to do remove for removelocation: %s", result->getErrorMessage().c_str())); + } + } + tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, to_remove.size())); + + return tracker; +} + } diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index db5a77bfb59..71c54e99f75 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -4,6 +4,7 @@ #include "types.h" #include "messages.h" #include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/removelocation.h> namespace document { class BucketIdFactory; } namespace vespalib { class ISequencedTaskExecutor; } @@ -31,6 +32,7 @@ public: MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrackerUP tracker) const; static bool is_async_message(api::MessageType::Id type_id) noexcept; private: bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index d03c9a6d111..3c981e193f2 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -55,7 +55,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr case api::MessageType::STATBUCKET_ID: return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), std::move(tracker)); case api::MessageType::REMOVELOCATION_ID: - return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), std::move(tracker)); + return _asyncHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), std::move(tracker)); case api::MessageType::MERGEBUCKET_ID: return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), std::move(tracker)); case api::MessageType::GETBUCKETDIFF_ID: diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp index 04afdc9eb85..6a2b5e79450 100644 --- a/storage/src/vespa/storage/persistence/processallhandler.cpp +++ b/storage/src/vespa/storage/persistence/processallhandler.cpp @@ -6,7 +6,6 @@ #include <vespa/document/fieldset/fieldsets.h> #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/vespalib/util/stringfmt.h> -#include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/log/log.h> LOG_SETUP(".persistence.processall"); @@ -21,32 +20,6 @@ ProcessAllHandler::ProcessAllHandler(const PersistenceUtil& env, spi::Persistenc namespace { -class UnrevertableRemoveEntryProcessor : public BucketProcessor::EntryProcessor { -public: - spi::PersistenceProvider& _provider; - const spi::Bucket& _bucket; - spi::Context& _context; - uint32_t _n_removed; - - UnrevertableRemoveEntryProcessor( - spi::PersistenceProvider& provider, - const spi::Bucket& bucket, - spi::Context& context) - : _provider(provider), - _bucket(bucket), - _context(context), - _n_removed(0) - {} - - void process(spi::DocEntry& entry) override { - spi::RemoveResult removeResult = _provider.remove(_bucket, entry.getTimestamp(), *entry.getDocumentId(),_context); - if (removeResult.getErrorCode() != spi::Result::ErrorType::NONE) { - throw std::runtime_error(vespalib::make_string("Failed to do remove for removelocation: %s", removeResult.getErrorMessage().c_str())); - } - ++_n_removed; - } -}; - class StatEntryProcessor : public BucketProcessor::EntryProcessor { public: std::ostream& ost; @@ -75,26 +48,6 @@ public: } MessageTracker::UP -ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTracker::UP tracker) const -{ - tracker->setMetric(_env._metrics.removeLocation); - - LOG(debug, "RemoveLocation(%s): using selection '%s'", - cmd.getBucketId().toString().c_str(), - cmd.getDocumentSelection().c_str()); - - spi::Bucket bucket(cmd.getBucket()); - UnrevertableRemoveEntryProcessor processor(_spi, bucket, tracker->context()); - BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(), - std::make_shared<document::DocIdOnly>(), - processor, spi::NEWEST_DOCUMENT_ONLY,tracker->context()); - - tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, processor._n_removed)); - - return tracker; -} - -MessageTracker::UP ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.statBucket); diff --git a/storage/src/vespa/storage/persistence/processallhandler.h b/storage/src/vespa/storage/persistence/processallhandler.h index 7a6156a95c7..1dfd286cd2b 100644 --- a/storage/src/vespa/storage/persistence/processallhandler.h +++ b/storage/src/vespa/storage/persistence/processallhandler.h @@ -3,7 +3,6 @@ #include "types.h" #include "messages.h" -#include <vespa/storageapi/message/removelocation.h> #include <vespa/storageapi/message/stat.h> namespace storage { @@ -14,7 +13,6 @@ class PersistenceUtil; class ProcessAllHandler : public Types { public: ProcessAllHandler(const PersistenceUtil&, spi::PersistenceProvider&); - MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand&, MessageTrackerUP tracker) const; MessageTrackerUP handleStatBucket(api::StatBucketCommand&, MessageTrackerUP tracker) const; private: const PersistenceUtil & _env; |