summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp10
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp120
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h16
-rw-r--r--storage/src/vespa/storage/persistence/bucketprocessor.cpp5
-rw-r--r--storage/src/vespa/storage/persistence/bucketprocessor.h4
-rw-r--r--storage/src/vespa/storage/persistence/processallhandler.cpp3
-rw-r--r--storage/src/vespa/storage/persistence/splitbitdetector.cpp14
7 files changed, 129 insertions, 43 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 2b0218bf20c..2434c6d3e0d 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -1409,6 +1409,11 @@ TEST_F(FileStorManagerTest, delete_bucket) {
ASSERT_TRUE(reply.get());
EXPECT_EQ(ReturnCode(ReturnCode::OK), reply->getResult());
}
+ // Bucket should be removed from DB
+ {
+ StorBucketDatabase::WrappedEntry entry(_node->getStorageBucketDatabase().get(bid, "foo"));
+ EXPECT_FALSE(entry.exists());
+ }
}
TEST_F(FileStorManagerTest, delete_bucket_rejects_outdated_bucket_info) {
@@ -1452,6 +1457,11 @@ TEST_F(FileStorManagerTest, delete_bucket_rejects_outdated_bucket_info) {
EXPECT_EQ(ReturnCode::REJECTED, reply->getResult().getResult());
EXPECT_EQ(bucketInfo, reply->getBucketInfo());
}
+ // Bucket should still exist in DB
+ {
+ StorBucketDatabase::WrappedEntry entry(_node->getStorageBucketDatabase().get(bid, "foo"));
+ EXPECT_TRUE(entry.exists());
+ }
}
/**
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 2e7d0caf151..bb3952c8c33 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -7,6 +7,7 @@
#include "bucketprocessor.h"
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/persistence/spi/docentry.h>
+#include <vespa/persistence/spi/doctype_gid_and_timestamp.h>
#include <vespa/persistence/spi/catchresult.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/document/update/documentupdate.h>
@@ -71,7 +72,7 @@ private:
template<class FunctionType>
std::unique_ptr<ResultTask>
-makeResultTask(FunctionType &&function) {
+makeResultTask(FunctionType&& function) {
return std::make_unique<LambdaResultTask<std::decay_t<FunctionType>>>
(std::forward<FunctionType>(function));
}
@@ -119,8 +120,8 @@ public:
: _to_remove(to_remove)
{}
- void process(spi::DocEntry& entry) override {
- _to_remove.emplace_back(*entry.getDocumentId(), entry.getTimestamp());
+ void process(std::unique_ptr<spi::DocEntry> entry) override {
+ _to_remove.emplace_back(*entry->getDocumentId(), entry->getTimestamp());
}
private:
DocumentIdsAndTimeStamps & _to_remove;
@@ -202,6 +203,28 @@ AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::
return tracker;
}
+void
+AsyncHandler::on_delete_bucket_complete(const document::Bucket& bucket) const {
+ StorBucketDatabase& db(_env.getBucketDatabase(bucket.getBucketSpace()));
+ StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(), "onDeleteBucket");
+ if (entry.exists() && entry->getMetaCount() > 0) {
+ LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
+ "active operation when delete bucket was queued. "
+ "Updating bucket database to keep it in sync with file. "
+ "Cannot delete bucket from bucket database at this "
+ "point, as it can have been intentionally recreated "
+ "after delete bucket had been sent",
+ bucket.getBucketId().toString().c_str());
+ api::BucketInfo info(0, 0, 0);
+ // Only set document counts/size; retain ready/active state.
+ info.setReady(entry->getBucketInfo().isReady());
+ info.setActive(entry->getBucketInfo().isActive());
+
+ entry->setBucketInfo(info);
+ entry.write();
+ }
+}
+
MessageTracker::UP
AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
{
@@ -216,33 +239,84 @@ AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::
return tracker;
}
- auto task = makeResultTask([this, tracker = std::move(tracker), bucket=cmd.getBucket()](spi::Result::UP ignored) {
+ auto task = makeResultTask([this, tracker = std::move(tracker), bucket = cmd.getBucket()]([[maybe_unused]] spi::Result::UP ignored) {
// TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric
- (void) ignored;
- StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace()));
- StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(), "onDeleteBucket");
- if (entry.exists() && entry->getMetaCount() > 0) {
- LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
- "active operation when delete bucket was queued. "
- "Updating bucket database to keep it in sync with file. "
- "Cannot delete bucket from bucket database at this "
- "point, as it can have been intentionally recreated "
- "after delete bucket had been sent",
- bucket.getBucketId().toString().c_str());
- api::BucketInfo info(0, 0, 0);
- // Only set document counts/size; retain ready/active state.
- info.setReady(entry->getBucketInfo().isReady());
- info.setActive(entry->getBucketInfo().isActive());
-
- entry->setBucketInfo(info);
- entry.write();
- }
+ on_delete_bucket_complete(bucket);
tracker->sendReply();
});
_spi.deleteBucketAsync(bucket, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
return tracker;
}
+namespace {
+
+class GatherBucketMetadata : public BucketProcessor::EntryProcessor {
+ std::vector<std::unique_ptr<spi::DocEntry>>& _entries;
+public:
+ explicit GatherBucketMetadata(std::vector<std::unique_ptr<spi::DocEntry>>& entries) noexcept
+ : _entries(entries)
+ {
+ }
+ ~GatherBucketMetadata() override = default;
+
+ void process(std::unique_ptr<spi::DocEntry> entry) override {
+ _entries.emplace_back(std::move(entry));
+ }
+};
+
+}
+
+MessageTracker::UP
+AsyncHandler::handle_delete_bucket_throttling(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.deleteBuckets);
+ LOG(debug, "DeleteBucket(%s) (with throttling)", cmd.getBucketId().toString().c_str());
+ if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
+ _env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
+ api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge"));
+ }
+ spi::Bucket spi_bucket(cmd.getBucket());
+ if (!checkProviderBucketInfoMatches(spi_bucket, cmd.getBucketInfo())) {
+ return tracker;
+ }
+
+ std::vector<std::unique_ptr<spi::DocEntry>> meta_entries;
+ {
+ GatherBucketMetadata meta_proc(meta_entries);
+ auto usage = vespalib::CpuUsage::use(CpuUsage::Category::READ);
+ // Note: we only explicitly remove Put entries; tombstones are expected to be
+ // cheap and will be purged as part of the subsequent DeleteBucket operation.
+ // (Additionally, the SPI does not expose a way to remove a remove...)
+ BucketProcessor::iterateAll(_spi, spi_bucket, "true",
+ std::make_shared<document::NoFields>(),
+ meta_proc, spi::NEWEST_DOCUMENT_ONLY, tracker->context());
+ }
+ auto invoke_delete_on_zero_refs = vespalib::makeSharedLambdaCallback([this, spi_bucket, bucket = cmd.getBucket(), tracker = std::move(tracker)]() mutable {
+ LOG(debug, "%s: about to invoke deleteBucketAsync", bucket.toString().c_str());
+ auto task = makeResultTask([this, tracker = std::move(tracker), bucket]([[maybe_unused]] spi::Result::UP ignored) {
+ LOG(debug, "%s: deleteBucket callback invoked; sending reply", bucket.toString().c_str());
+ on_delete_bucket_complete(bucket);
+ tracker->sendReply();
+ });
+ _spi.deleteBucketAsync(spi_bucket, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket.getBucketId(), std::move(task)));
+ });
+ auto& throttler = _env._fileStorHandler.operation_throttler();
+ for (auto& meta : meta_entries) {
+ auto token = throttler.blocking_acquire_one();
+ std::vector<spi::DocTypeGidAndTimestamp> to_remove = {{meta->getDocumentType(), meta->getGid(), meta->getTimestamp()}};
+ auto task = makeResultTask([bucket = cmd.getBucket(), token = std::move(token), invoke_delete_on_zero_refs]([[maybe_unused]] spi::Result::UP ignored) {
+ LOG(spam, "%s: completed removeByGidAsync operation", bucket.toString().c_str());
+ // Nothing else clever to do here. Throttle token and deleteBucket dispatch refs dropped implicitly.
+ });
+ LOG(spam, "%s: about to invoke removeByGidAsync(%s, %s, %zu)", cmd.getBucket().toString().c_str(),
+ vespalib::string(meta->getDocumentType()).c_str(), meta->getGid().toString().c_str(), meta->getTimestamp().getValue());
+ _spi.removeByGidAsync(spi_bucket, std::move(to_remove), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
+ }
+ // Actual bucket deletion happens when all remove ops have ACKed and dropped their refs to the destructor-invoked
+ // deleteBucket dispatcher. Note: this works transparently when the bucket is empty (no refs; happens immediately).
+ return tracker;
+}
+
MessageTracker::UP
AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP trackerUP) const
{
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index 5ff20c6b9c5..1433176036b 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -24,22 +24,24 @@ class MessageTracker;
class AsyncHandler {
using MessageTrackerUP = std::unique_ptr<MessageTracker>;
public:
- AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, BucketOwnershipNotifier &,
- vespalib::ISequencedTaskExecutor & executor, const document::BucketIdFactory & bucketIdFactory);
+ AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, BucketOwnershipNotifier&,
+ vespalib::ISequencedTaskExecutor& executor, const document::BucketIdFactory& bucketIdFactory);
MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handle_delete_bucket_throttling(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrackerUP tracker) const;
- static bool is_async_unconditional_message(const api::StorageMessage & cmd) noexcept;
+ static bool is_async_unconditional_message(const api::StorageMessage& cmd) noexcept;
private:
- bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
- static bool tasConditionExists(const api::TestAndSetCommand & cmd);
- bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
- spi::Context & context, bool missingDocumentImpliesMatch = false) const;
+ [[nodiscard]] bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
+ static bool tasConditionExists(const api::TestAndSetCommand& cmd);
+ bool tasConditionMatches(const api::TestAndSetCommand& cmd, MessageTracker& tracker,
+ spi::Context& context, bool missingDocumentImpliesMatch = false) const;
+ void on_delete_bucket_complete(const document::Bucket& bucket) const;
const PersistenceUtil & _env;
spi::PersistenceProvider & _spi;
BucketOwnershipNotifier & _bucketOwnershipNotifier;
diff --git a/storage/src/vespa/storage/persistence/bucketprocessor.cpp b/storage/src/vespa/storage/persistence/bucketprocessor.cpp
index 99654b7c16a..84df361d2e1 100644
--- a/storage/src/vespa/storage/persistence/bucketprocessor.cpp
+++ b/storage/src/vespa/storage/persistence/bucketprocessor.cpp
@@ -2,6 +2,7 @@
#include "bucketprocessor.h"
#include <vespa/document/fieldset/fieldsets.h>
+#include <vespa/persistence/spi/docentry.h>
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <cassert>
@@ -68,8 +69,8 @@ BucketProcessor::iterateAll(spi::PersistenceProvider& provider,
throw std::runtime_error(ss.str());
}
- for (size_t i = 0; i < result.getEntries().size(); ++i) {
- processor.process(*result.getEntries()[i]);
+ for (auto& entry : result.steal_entries()) {
+ processor.process(std::move(entry));
}
if (result.isCompleted()) {
diff --git a/storage/src/vespa/storage/persistence/bucketprocessor.h b/storage/src/vespa/storage/persistence/bucketprocessor.h
index 3c2383967c8..d295566e0b1 100644
--- a/storage/src/vespa/storage/persistence/bucketprocessor.h
+++ b/storage/src/vespa/storage/persistence/bucketprocessor.h
@@ -23,8 +23,8 @@ class BucketProcessor
public:
class EntryProcessor {
public:
- virtual ~EntryProcessor() {};
- virtual void process(spi::DocEntry&) = 0;
+ virtual ~EntryProcessor() = default;
+ virtual void process(std::unique_ptr<spi::DocEntry>) = 0;
};
static void iterateAll(spi::PersistenceProvider&,
diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp
index afab5391e2d..6eb327f021e 100644
--- a/storage/src/vespa/storage/persistence/processallhandler.cpp
+++ b/storage/src/vespa/storage/persistence/processallhandler.cpp
@@ -29,7 +29,8 @@ public:
explicit StatEntryProcessor(std::ostream& o)
: ost(o) {};
- void process(spi::DocEntry& e) override {
+ void process(std::unique_ptr<spi::DocEntry> entry) override {
+ const auto& e = *entry;
ost << " Timestamp: " << e.getTimestamp() << ", ";
if (e.getDocument() != nullptr) {
ost << "Doc(" << e.getDocument()->getId() << ")"
diff --git a/storage/src/vespa/storage/persistence/splitbitdetector.cpp b/storage/src/vespa/storage/persistence/splitbitdetector.cpp
index 0a287258356..2d338be967a 100644
--- a/storage/src/vespa/storage/persistence/splitbitdetector.cpp
+++ b/storage/src/vespa/storage/persistence/splitbitdetector.cpp
@@ -56,17 +56,17 @@ struct BucketVisitor : public BucketProcessor::EntryProcessor {
};
std::vector<DocInfo> _firstDocs;
- BucketVisitor(const document::BucketIdFactory& factory);
- ~BucketVisitor();
+ explicit BucketVisitor(const document::BucketIdFactory& factory);
+ ~BucketVisitor() override;
- void process(spi::DocEntry& entry) override {
- assert(entry.getDocumentId());
+ void process(std::unique_ptr<spi::DocEntry> entry) override {
+ assert(entry->getDocumentId());
++_docCount;
- const document::DocumentId& id(*entry.getDocumentId());
+ const document::DocumentId& id(*entry->getDocumentId());
document::BucketId bucket = _factory.getBucketId(id);
if (_firstDocs.size() < keepFirstCount) {
- _firstDocs.push_back(DocInfo(entry.getTimestamp(), id, bucket));
+ _firstDocs.emplace_back(entry->getTimestamp(), id, bucket);
}
if (_refBucket.getRawId() == 0) {
@@ -83,8 +83,6 @@ struct BucketVisitor : public BucketProcessor::EntryProcessor {
_conflictId = id;
_conflictBucket = bucket;
}
-
- return;
}
void printEntrySummary(std::ostream& out) {