summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-11-17 18:31:29 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-11-17 18:31:29 +0000
commit8e5767e8d878d59c082ade3ef51f991a81338220 (patch)
tree5c86157e0449947fafd57c0a904018bc9173fcb5
parentab3518e8b3a4caf742e12a134c4fb1d2bbf3c293 (diff)
Move removeLocation over to Asynchandler and issue all removes for one bucket before waiting for the replies.
Prepare RemoveResult to contain more replies.
-rw-r--r--persistence/src/vespa/persistence/spi/result.h34
-rw-r--r--storage/src/tests/persistence/processalltest.cpp13
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp53
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h2
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/processallhandler.cpp47
-rw-r--r--storage/src/vespa/storage/persistence/processallhandler.h2
7 files changed, 82 insertions, 71 deletions
diff --git a/persistence/src/vespa/persistence/spi/result.h b/persistence/src/vespa/persistence/spi/result.h
index 4ec323c6b47..73838e99a18 100644
--- a/persistence/src/vespa/persistence/spi/result.h
+++ b/persistence/src/vespa/persistence/spi/result.h
@@ -25,14 +25,15 @@ public:
/**
* Constructor to use for a result where there is no error.
*/
- Result() : _errorCode(ErrorType::NONE), _errorMessage() {}
+ Result() noexcept : _errorCode(ErrorType::NONE), _errorMessage() {}
/**
* Constructor to use when an error has been detected.
*/
- Result(ErrorType error, const vespalib::string& errorMessage)
+ Result(ErrorType error, const vespalib::string& errorMessage) noexcept
: _errorCode(error),
- _errorMessage(errorMessage) {}
+ _errorMessage(errorMessage)
+ {}
Result(const Result &);
Result & operator = (const Result &);
@@ -130,21 +131,20 @@ public:
* The service layer will not update the bucket information in this case,
* so it should not be returned either.
*/
- RemoveResult(ErrorType error, const vespalib::string& errorMessage)
+ RemoveResult(ErrorType error, const vespalib::string& errorMessage) noexcept
: Result(error, errorMessage),
- _wasFound(false)
+ _numRemoved(0)
{ }
- /**
- * Constructor to use when the remove was successful.
- */
- RemoveResult(bool foundDocument)
- : _wasFound(foundDocument) { }
-
- bool wasFound() const { return _wasFound; }
+ explicit RemoveResult(bool found) noexcept
+ : RemoveResult(found ? 1u : 0u) { }
+ explicit RemoveResult(uint32_t numRemoved) noexcept
+ : _numRemoved(numRemoved) { }
+ bool wasFound() const { return _numRemoved > 0; }
+ uint32_t num_removed() const { return _numRemoved; }
private:
- bool _wasFound;
+ uint32_t _numRemoved;
};
class GetResult : public Result {
@@ -257,14 +257,14 @@ public:
/**
* Constructor used when there was an error creating the iterator.
*/
- CreateIteratorResult(ErrorType error, const vespalib::string& errorMessage)
+ CreateIteratorResult(ErrorType error, const vespalib::string& errorMessage) noexcept
: Result(error, errorMessage),
_iterator(0) { }
/**
* Constructor used when the iterator state was successfully created.
*/
- CreateIteratorResult(const IteratorId& id)
+ CreateIteratorResult(const IteratorId& id) noexcept
: _iterator(id)
{ }
@@ -299,8 +299,8 @@ public:
{ }
IterateResult(const IterateResult &) = delete;
- IterateResult(IterateResult &&rhs) = default;
- IterateResult &operator=(IterateResult &&rhs) = default;
+ IterateResult(IterateResult &&rhs) noexcept = default;
+ IterateResult &operator=(IterateResult &&rhs) noexcept = default;
~IterateResult();
diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp
index 80047a1797c..18242e62873 100644
--- a/storage/src/tests/persistence/processalltest.cpp
+++ b/storage/src/tests/persistence/processalltest.cpp
@@ -2,6 +2,7 @@
#include <vespa/document/base/testdocman.h>
#include <vespa/storage/persistence/processallhandler.h>
+#include <vespa/storage/persistence/asynchandler.h>
#include <vespa/storage/persistence/messages.h>
#include <tests/persistence/persistencetestutils.h>
#include <vespa/document/test/make_document_bucket.h>
@@ -36,7 +37,8 @@ TEST_F(ProcessAllHandlerTest, remove_location) {
document::Bucket bucket = makeDocumentBucket(bucketId);
auto cmd = std::make_shared<api::RemoveLocationCommand>("id.user == 4", bucket);
- ProcessAllHandler handler(getEnv(), getPersistenceProvider());
+ document::BucketIdFactory bucketIdFactory;
+ AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier, *_sequenceTaskExecutor, bucketIdFactory);
auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket));
EXPECT_EQ("DocEntry(1234, 1, id:mail:testdoctype1:n=4:3619.html)\n"
@@ -50,7 +52,8 @@ TEST_F(ProcessAllHandlerTest, remove_location) {
TEST_F(ProcessAllHandlerTest, remove_location_document_subset) {
document::BucketId bucketId(16, 4);
- ProcessAllHandler handler(getEnv(), getPersistenceProvider());
+ document::BucketIdFactory bucketIdFactory;
+ AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier, *_sequenceTaskExecutor, bucketIdFactory);
document::TestDocMan docMan;
for (int i = 0; i < 10; ++i) {
@@ -87,7 +90,8 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_unknown_doc_ty
document::Bucket bucket = makeDocumentBucket(bucketId);
auto cmd = std::make_shared<api::RemoveLocationCommand>("unknowndoctype.headerval % 2 == 0", bucket);
- ProcessAllHandler handler(getEnv(), getPersistenceProvider());
+ document::BucketIdFactory bucketIdFactory;
+ AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier, *_sequenceTaskExecutor, bucketIdFactory);
ASSERT_THROW(handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)), std::exception);
EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n",
@@ -101,7 +105,8 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_bogus_selectio
document::Bucket bucket = makeDocumentBucket(bucketId);
auto cmd = std::make_shared<api::RemoveLocationCommand>("id.bogus != badgers", bucket);
- ProcessAllHandler handler(getEnv(), getPersistenceProvider());
+ document::BucketIdFactory bucketIdFactory;
+ AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier, *_sequenceTaskExecutor, bucketIdFactory);
ASSERT_THROW(handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)), std::exception);
EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n",
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index bc6e67578c0..aac2b0748c4 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -4,16 +4,21 @@
#include "persistenceutil.h"
#include "testandsethelper.h"
#include "bucketownershipnotifier.h"
+#include "bucketprocessor.h"
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/persistence/spi/catchresult.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/document/update/documentupdate.h>
+#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/vespalib/util/destructor_callbacks.h>
+#include <vespa/vespalib/util/count_down_latch.h>
+#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/log/log.h>
LOG_SETUP(".storage.persistence.asynchandler");
+using vespalib::make_string_short::fmt;
namespace storage {
namespace {
@@ -103,6 +108,20 @@ bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo
return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount()));
}
+class UnrevertableRemoveEntryProcessor : public BucketProcessor::EntryProcessor {
+public:
+ using DocumentIdsAndTimeStamps = std::vector<std::pair<spi::Timestamp, spi::DocumentId>>;
+ UnrevertableRemoveEntryProcessor(DocumentIdsAndTimeStamps & to_remove)
+ : _to_remove(to_remove)
+ {}
+
+ void process(spi::DocEntry& entry) override {
+ _to_remove.emplace_back(entry.getTimestamp(), *entry.getDocumentId());
+ }
+private:
+ DocumentIdsAndTimeStamps & _to_remove;
+};
+
}
AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider & spi,
@@ -378,4 +397,38 @@ AsyncHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const ap
return true;
}
+MessageTracker::UP
+AsyncHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.removeLocation);
+
+ LOG(debug, "RemoveLocation(%s): using selection '%s'",
+ cmd.getBucketId().toString().c_str(),
+ cmd.getDocumentSelection().c_str());
+
+ spi::Bucket bucket(cmd.getBucket());
+ UnrevertableRemoveEntryProcessor::DocumentIdsAndTimeStamps to_remove;
+ UnrevertableRemoveEntryProcessor processor(to_remove);
+ BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(),
+ std::make_shared<document::DocIdOnly>(),
+ processor, spi::NEWEST_DOCUMENT_ONLY,tracker->context());
+
+ std::vector<std::future<std::unique_ptr<spi::Result>>> results;
+ results.reserve(to_remove.size());
+ for (auto & entry : to_remove) {
+ auto catcher = std::make_unique<spi::CatchResult>();
+ results.push_back(catcher->future_result());
+ _spi.removeAsync(bucket, entry.first, entry.second, tracker->context(), std::move(catcher));
+ }
+ for (auto & future : results) {
+ auto result = future.get();
+ if (result->getErrorCode() != spi::Result::ErrorType::NONE) {
+ throw std::runtime_error(fmt("Failed to do remove for removelocation: %s", result->getErrorMessage().c_str()));
+ }
+ }
+ tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, to_remove.size()));
+
+ return tracker;
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index db5a77bfb59..71c54e99f75 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -4,6 +4,7 @@
#include "types.h"
#include "messages.h"
#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/message/removelocation.h>
namespace document { class BucketIdFactory; }
namespace vespalib { class ISequencedTaskExecutor; }
@@ -31,6 +32,7 @@ public:
MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrackerUP tracker) const;
static bool is_async_message(api::MessageType::Id type_id) noexcept;
private:
bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index d03c9a6d111..3c981e193f2 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -55,7 +55,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
case api::MessageType::STATBUCKET_ID:
return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), std::move(tracker));
case api::MessageType::REMOVELOCATION_ID:
- return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), std::move(tracker));
+ return _asyncHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), std::move(tracker));
case api::MessageType::MERGEBUCKET_ID:
return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), std::move(tracker));
case api::MessageType::GETBUCKETDIFF_ID:
diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp
index 04afdc9eb85..6a2b5e79450 100644
--- a/storage/src/vespa/storage/persistence/processallhandler.cpp
+++ b/storage/src/vespa/storage/persistence/processallhandler.cpp
@@ -6,7 +6,6 @@
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/log/log.h>
LOG_SETUP(".persistence.processall");
@@ -21,32 +20,6 @@ ProcessAllHandler::ProcessAllHandler(const PersistenceUtil& env, spi::Persistenc
namespace {
-class UnrevertableRemoveEntryProcessor : public BucketProcessor::EntryProcessor {
-public:
- spi::PersistenceProvider& _provider;
- const spi::Bucket& _bucket;
- spi::Context& _context;
- uint32_t _n_removed;
-
- UnrevertableRemoveEntryProcessor(
- spi::PersistenceProvider& provider,
- const spi::Bucket& bucket,
- spi::Context& context)
- : _provider(provider),
- _bucket(bucket),
- _context(context),
- _n_removed(0)
- {}
-
- void process(spi::DocEntry& entry) override {
- spi::RemoveResult removeResult = _provider.remove(_bucket, entry.getTimestamp(), *entry.getDocumentId(),_context);
- if (removeResult.getErrorCode() != spi::Result::ErrorType::NONE) {
- throw std::runtime_error(vespalib::make_string("Failed to do remove for removelocation: %s", removeResult.getErrorMessage().c_str()));
- }
- ++_n_removed;
- }
-};
-
class StatEntryProcessor : public BucketProcessor::EntryProcessor {
public:
std::ostream& ost;
@@ -75,26 +48,6 @@ public:
}
MessageTracker::UP
-ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTracker::UP tracker) const
-{
- tracker->setMetric(_env._metrics.removeLocation);
-
- LOG(debug, "RemoveLocation(%s): using selection '%s'",
- cmd.getBucketId().toString().c_str(),
- cmd.getDocumentSelection().c_str());
-
- spi::Bucket bucket(cmd.getBucket());
- UnrevertableRemoveEntryProcessor processor(_spi, bucket, tracker->context());
- BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(),
- std::make_shared<document::DocIdOnly>(),
- processor, spi::NEWEST_DOCUMENT_ONLY,tracker->context());
-
- tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, processor._n_removed));
-
- return tracker;
-}
-
-MessageTracker::UP
ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.statBucket);
diff --git a/storage/src/vespa/storage/persistence/processallhandler.h b/storage/src/vespa/storage/persistence/processallhandler.h
index 7a6156a95c7..1dfd286cd2b 100644
--- a/storage/src/vespa/storage/persistence/processallhandler.h
+++ b/storage/src/vespa/storage/persistence/processallhandler.h
@@ -3,7 +3,6 @@
#include "types.h"
#include "messages.h"
-#include <vespa/storageapi/message/removelocation.h>
#include <vespa/storageapi/message/stat.h>
namespace storage {
@@ -14,7 +13,6 @@ class PersistenceUtil;
class ProcessAllHandler : public Types {
public:
ProcessAllHandler(const PersistenceUtil&, spi::PersistenceProvider&);
- MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand&, MessageTrackerUP tracker) const;
MessageTrackerUP handleStatBucket(api::StatBucketCommand&, MessageTrackerUP tracker) const;
private:
const PersistenceUtil & _env;