summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2023-11-09 15:56:30 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2023-11-10 13:10:59 +0000
commitb4ca69ae534534f4f3c36b96aa2423f93001b05f (patch)
tree5d636274dcfaf5b27e10baa52ba1661637a21ac3 /storage
parentd1a69ad4cf19eae5efb7ff5ba3854d33551221bc (diff)
Implement DeleteBucket with throttled per-document async removal
Previous (legacy) behavior was to immediately async schedule a full bucket deletion in the persistence backend, which incurs a very disproportionate cost when documents are backed by many and/or heavy indexes (such as HNSW). This risked swamping the backend with tens to hundreds of thousands of concurrent document deletes. New behavior splits deletion into three phases: 1. Metadata enumeration for all documents present in the bucket 2. Persistence-throttled async remove _per individual document_ that was returned in the iteration result. This blocks the persistence thread (by design) if the throttling window is not sufficiently large to accomodate all pending deletes. 3. Once all async removes have been ACKed, schedule the actual `DeleteBucket` operation towards the backend. This will clean up any remaining (cheap) tombstone entries as well as the meta data store. Operation reply is sent as before once the delete has completed.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp10
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp120
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h16
-rw-r--r--storage/src/vespa/storage/persistence/bucketprocessor.cpp5
-rw-r--r--storage/src/vespa/storage/persistence/bucketprocessor.h4
-rw-r--r--storage/src/vespa/storage/persistence/processallhandler.cpp3
-rw-r--r--storage/src/vespa/storage/persistence/splitbitdetector.cpp14
7 files changed, 129 insertions, 43 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 2b0218bf20c..2434c6d3e0d 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -1409,6 +1409,11 @@ TEST_F(FileStorManagerTest, delete_bucket) {
ASSERT_TRUE(reply.get());
EXPECT_EQ(ReturnCode(ReturnCode::OK), reply->getResult());
}
+ // Bucket should be removed from DB
+ {
+ StorBucketDatabase::WrappedEntry entry(_node->getStorageBucketDatabase().get(bid, "foo"));
+ EXPECT_FALSE(entry.exists());
+ }
}
TEST_F(FileStorManagerTest, delete_bucket_rejects_outdated_bucket_info) {
@@ -1452,6 +1457,11 @@ TEST_F(FileStorManagerTest, delete_bucket_rejects_outdated_bucket_info) {
EXPECT_EQ(ReturnCode::REJECTED, reply->getResult().getResult());
EXPECT_EQ(bucketInfo, reply->getBucketInfo());
}
+ // Bucket should still exist in DB
+ {
+ StorBucketDatabase::WrappedEntry entry(_node->getStorageBucketDatabase().get(bid, "foo"));
+ EXPECT_TRUE(entry.exists());
+ }
}
/**
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 2e7d0caf151..bb3952c8c33 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -7,6 +7,7 @@
#include "bucketprocessor.h"
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/persistence/spi/docentry.h>
+#include <vespa/persistence/spi/doctype_gid_and_timestamp.h>
#include <vespa/persistence/spi/catchresult.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/document/update/documentupdate.h>
@@ -71,7 +72,7 @@ private:
template<class FunctionType>
std::unique_ptr<ResultTask>
-makeResultTask(FunctionType &&function) {
+makeResultTask(FunctionType&& function) {
return std::make_unique<LambdaResultTask<std::decay_t<FunctionType>>>
(std::forward<FunctionType>(function));
}
@@ -119,8 +120,8 @@ public:
: _to_remove(to_remove)
{}
- void process(spi::DocEntry& entry) override {
- _to_remove.emplace_back(*entry.getDocumentId(), entry.getTimestamp());
+ void process(std::unique_ptr<spi::DocEntry> entry) override {
+ _to_remove.emplace_back(*entry->getDocumentId(), entry->getTimestamp());
}
private:
DocumentIdsAndTimeStamps & _to_remove;
@@ -202,6 +203,28 @@ AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::
return tracker;
}
+void
+AsyncHandler::on_delete_bucket_complete(const document::Bucket& bucket) const {
+ StorBucketDatabase& db(_env.getBucketDatabase(bucket.getBucketSpace()));
+ StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(), "onDeleteBucket");
+ if (entry.exists() && entry->getMetaCount() > 0) {
+ LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
+ "active operation when delete bucket was queued. "
+ "Updating bucket database to keep it in sync with file. "
+ "Cannot delete bucket from bucket database at this "
+ "point, as it can have been intentionally recreated "
+ "after delete bucket had been sent",
+ bucket.getBucketId().toString().c_str());
+ api::BucketInfo info(0, 0, 0);
+ // Only set document counts/size; retain ready/active state.
+ info.setReady(entry->getBucketInfo().isReady());
+ info.setActive(entry->getBucketInfo().isActive());
+
+ entry->setBucketInfo(info);
+ entry.write();
+ }
+}
+
MessageTracker::UP
AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
{
@@ -216,33 +239,84 @@ AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::
return tracker;
}
- auto task = makeResultTask([this, tracker = std::move(tracker), bucket=cmd.getBucket()](spi::Result::UP ignored) {
+ auto task = makeResultTask([this, tracker = std::move(tracker), bucket = cmd.getBucket()]([[maybe_unused]] spi::Result::UP ignored) {
// TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric
- (void) ignored;
- StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace()));
- StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(), "onDeleteBucket");
- if (entry.exists() && entry->getMetaCount() > 0) {
- LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
- "active operation when delete bucket was queued. "
- "Updating bucket database to keep it in sync with file. "
- "Cannot delete bucket from bucket database at this "
- "point, as it can have been intentionally recreated "
- "after delete bucket had been sent",
- bucket.getBucketId().toString().c_str());
- api::BucketInfo info(0, 0, 0);
- // Only set document counts/size; retain ready/active state.
- info.setReady(entry->getBucketInfo().isReady());
- info.setActive(entry->getBucketInfo().isActive());
-
- entry->setBucketInfo(info);
- entry.write();
- }
+ on_delete_bucket_complete(bucket);
tracker->sendReply();
});
_spi.deleteBucketAsync(bucket, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
return tracker;
}
+namespace {
+
+class GatherBucketMetadata : public BucketProcessor::EntryProcessor {
+ std::vector<std::unique_ptr<spi::DocEntry>>& _entries;
+public:
+ explicit GatherBucketMetadata(std::vector<std::unique_ptr<spi::DocEntry>>& entries) noexcept
+ : _entries(entries)
+ {
+ }
+ ~GatherBucketMetadata() override = default;
+
+ void process(std::unique_ptr<spi::DocEntry> entry) override {
+ _entries.emplace_back(std::move(entry));
+ }
+};
+
+}
+
+MessageTracker::UP
+AsyncHandler::handle_delete_bucket_throttling(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.deleteBuckets);
+ LOG(debug, "DeleteBucket(%s) (with throttling)", cmd.getBucketId().toString().c_str());
+ if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
+ _env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
+ api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge"));
+ }
+ spi::Bucket spi_bucket(cmd.getBucket());
+ if (!checkProviderBucketInfoMatches(spi_bucket, cmd.getBucketInfo())) {
+ return tracker;
+ }
+
+ std::vector<std::unique_ptr<spi::DocEntry>> meta_entries;
+ {
+ GatherBucketMetadata meta_proc(meta_entries);
+ auto usage = vespalib::CpuUsage::use(CpuUsage::Category::READ);
+ // Note: we only explicitly remove Put entries; tombstones are expected to be
+ // cheap and will be purged as part of the subsequent DeleteBucket operation.
+ // (Additionally, the SPI does not expose a way to remove a remove...)
+ BucketProcessor::iterateAll(_spi, spi_bucket, "true",
+ std::make_shared<document::NoFields>(),
+ meta_proc, spi::NEWEST_DOCUMENT_ONLY, tracker->context());
+ }
+ auto invoke_delete_on_zero_refs = vespalib::makeSharedLambdaCallback([this, spi_bucket, bucket = cmd.getBucket(), tracker = std::move(tracker)]() mutable {
+ LOG(debug, "%s: about to invoke deleteBucketAsync", bucket.toString().c_str());
+ auto task = makeResultTask([this, tracker = std::move(tracker), bucket]([[maybe_unused]] spi::Result::UP ignored) {
+ LOG(debug, "%s: deleteBucket callback invoked; sending reply", bucket.toString().c_str());
+ on_delete_bucket_complete(bucket);
+ tracker->sendReply();
+ });
+ _spi.deleteBucketAsync(spi_bucket, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket.getBucketId(), std::move(task)));
+ });
+ auto& throttler = _env._fileStorHandler.operation_throttler();
+ for (auto& meta : meta_entries) {
+ auto token = throttler.blocking_acquire_one();
+ std::vector<spi::DocTypeGidAndTimestamp> to_remove = {{meta->getDocumentType(), meta->getGid(), meta->getTimestamp()}};
+ auto task = makeResultTask([bucket = cmd.getBucket(), token = std::move(token), invoke_delete_on_zero_refs]([[maybe_unused]] spi::Result::UP ignored) {
+ LOG(spam, "%s: completed removeByGidAsync operation", bucket.toString().c_str());
+ // Nothing else clever to do here. Throttle token and deleteBucket dispatch refs dropped implicitly.
+ });
+ LOG(spam, "%s: about to invoke removeByGidAsync(%s, %s, %zu)", cmd.getBucket().toString().c_str(),
+ vespalib::string(meta->getDocumentType()).c_str(), meta->getGid().toString().c_str(), meta->getTimestamp().getValue());
+ _spi.removeByGidAsync(spi_bucket, std::move(to_remove), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
+ }
+ // Actual bucket deletion happens when all remove ops have ACKed and dropped their refs to the destructor-invoked
+ // deleteBucket dispatcher. Note: this works transparently when the bucket is empty (no refs; happens immediately).
+ return tracker;
+}
+
MessageTracker::UP
AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP trackerUP) const
{
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index 5ff20c6b9c5..1433176036b 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -24,22 +24,24 @@ class MessageTracker;
class AsyncHandler {
using MessageTrackerUP = std::unique_ptr<MessageTracker>;
public:
- AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, BucketOwnershipNotifier &,
- vespalib::ISequencedTaskExecutor & executor, const document::BucketIdFactory & bucketIdFactory);
+ AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, BucketOwnershipNotifier&,
+ vespalib::ISequencedTaskExecutor& executor, const document::BucketIdFactory& bucketIdFactory);
MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handle_delete_bucket_throttling(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrackerUP tracker) const;
- static bool is_async_unconditional_message(const api::StorageMessage & cmd) noexcept;
+ static bool is_async_unconditional_message(const api::StorageMessage& cmd) noexcept;
private:
- bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
- static bool tasConditionExists(const api::TestAndSetCommand & cmd);
- bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
- spi::Context & context, bool missingDocumentImpliesMatch = false) const;
+ [[nodiscard]] bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
+ static bool tasConditionExists(const api::TestAndSetCommand& cmd);
+ bool tasConditionMatches(const api::TestAndSetCommand& cmd, MessageTracker& tracker,
+ spi::Context& context, bool missingDocumentImpliesMatch = false) const;
+ void on_delete_bucket_complete(const document::Bucket& bucket) const;
const PersistenceUtil & _env;
spi::PersistenceProvider & _spi;
BucketOwnershipNotifier & _bucketOwnershipNotifier;
diff --git a/storage/src/vespa/storage/persistence/bucketprocessor.cpp b/storage/src/vespa/storage/persistence/bucketprocessor.cpp
index 99654b7c16a..84df361d2e1 100644
--- a/storage/src/vespa/storage/persistence/bucketprocessor.cpp
+++ b/storage/src/vespa/storage/persistence/bucketprocessor.cpp
@@ -2,6 +2,7 @@
#include "bucketprocessor.h"
#include <vespa/document/fieldset/fieldsets.h>
+#include <vespa/persistence/spi/docentry.h>
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <cassert>
@@ -68,8 +69,8 @@ BucketProcessor::iterateAll(spi::PersistenceProvider& provider,
throw std::runtime_error(ss.str());
}
- for (size_t i = 0; i < result.getEntries().size(); ++i) {
- processor.process(*result.getEntries()[i]);
+ for (auto& entry : result.steal_entries()) {
+ processor.process(std::move(entry));
}
if (result.isCompleted()) {
diff --git a/storage/src/vespa/storage/persistence/bucketprocessor.h b/storage/src/vespa/storage/persistence/bucketprocessor.h
index 3c2383967c8..d295566e0b1 100644
--- a/storage/src/vespa/storage/persistence/bucketprocessor.h
+++ b/storage/src/vespa/storage/persistence/bucketprocessor.h
@@ -23,8 +23,8 @@ class BucketProcessor
public:
class EntryProcessor {
public:
- virtual ~EntryProcessor() {};
- virtual void process(spi::DocEntry&) = 0;
+ virtual ~EntryProcessor() = default;
+ virtual void process(std::unique_ptr<spi::DocEntry>) = 0;
};
static void iterateAll(spi::PersistenceProvider&,
diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp
index afab5391e2d..6eb327f021e 100644
--- a/storage/src/vespa/storage/persistence/processallhandler.cpp
+++ b/storage/src/vespa/storage/persistence/processallhandler.cpp
@@ -29,7 +29,8 @@ public:
explicit StatEntryProcessor(std::ostream& o)
: ost(o) {};
- void process(spi::DocEntry& e) override {
+ void process(std::unique_ptr<spi::DocEntry> entry) override {
+ const auto& e = *entry;
ost << " Timestamp: " << e.getTimestamp() << ", ";
if (e.getDocument() != nullptr) {
ost << "Doc(" << e.getDocument()->getId() << ")"
diff --git a/storage/src/vespa/storage/persistence/splitbitdetector.cpp b/storage/src/vespa/storage/persistence/splitbitdetector.cpp
index 0a287258356..2d338be967a 100644
--- a/storage/src/vespa/storage/persistence/splitbitdetector.cpp
+++ b/storage/src/vespa/storage/persistence/splitbitdetector.cpp
@@ -56,17 +56,17 @@ struct BucketVisitor : public BucketProcessor::EntryProcessor {
};
std::vector<DocInfo> _firstDocs;
- BucketVisitor(const document::BucketIdFactory& factory);
- ~BucketVisitor();
+ explicit BucketVisitor(const document::BucketIdFactory& factory);
+ ~BucketVisitor() override;
- void process(spi::DocEntry& entry) override {
- assert(entry.getDocumentId());
+ void process(std::unique_ptr<spi::DocEntry> entry) override {
+ assert(entry->getDocumentId());
++_docCount;
- const document::DocumentId& id(*entry.getDocumentId());
+ const document::DocumentId& id(*entry->getDocumentId());
document::BucketId bucket = _factory.getBucketId(id);
if (_firstDocs.size() < keepFirstCount) {
- _firstDocs.push_back(DocInfo(entry.getTimestamp(), id, bucket));
+ _firstDocs.emplace_back(entry->getTimestamp(), id, bucket);
}
if (_refBucket.getRawId() == 0) {
@@ -83,8 +83,6 @@ struct BucketVisitor : public BucketProcessor::EntryProcessor {
_conflictId = id;
_conflictBucket = bucket;
}
-
- return;
}
void printEntrySummary(std::ostream& out) {