diff options
Diffstat (limited to 'storage')
7 files changed, 129 insertions, 43 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 2b0218bf20c..2434c6d3e0d 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -1409,6 +1409,11 @@ TEST_F(FileStorManagerTest, delete_bucket) { ASSERT_TRUE(reply.get()); EXPECT_EQ(ReturnCode(ReturnCode::OK), reply->getResult()); } + // Bucket should be removed from DB + { + StorBucketDatabase::WrappedEntry entry(_node->getStorageBucketDatabase().get(bid, "foo")); + EXPECT_FALSE(entry.exists()); + } } TEST_F(FileStorManagerTest, delete_bucket_rejects_outdated_bucket_info) { @@ -1452,6 +1457,11 @@ TEST_F(FileStorManagerTest, delete_bucket_rejects_outdated_bucket_info) { EXPECT_EQ(ReturnCode::REJECTED, reply->getResult().getResult()); EXPECT_EQ(bucketInfo, reply->getBucketInfo()); } + // Bucket should still exist in DB + { + StorBucketDatabase::WrappedEntry entry(_node->getStorageBucketDatabase().get(bid, "foo")); + EXPECT_TRUE(entry.exists()); + } } /** diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 2e7d0caf151..bb3952c8c33 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -7,6 +7,7 @@ #include "bucketprocessor.h" #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/persistence/spi/docentry.h> +#include <vespa/persistence/spi/doctype_gid_and_timestamp.h> #include <vespa/persistence/spi/catchresult.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/document/update/documentupdate.h> @@ -71,7 +72,7 @@ private: template<class FunctionType> std::unique_ptr<ResultTask> -makeResultTask(FunctionType &&function) { +makeResultTask(FunctionType&& function) { return std::make_unique<LambdaResultTask<std::decay_t<FunctionType>>> (std::forward<FunctionType>(function)); } @@ -119,8 +120,8 @@ public: : _to_remove(to_remove) {} - void process(spi::DocEntry& entry) override { - _to_remove.emplace_back(*entry.getDocumentId(), entry.getTimestamp()); + void process(std::unique_ptr<spi::DocEntry> entry) override { + _to_remove.emplace_back(*entry->getDocumentId(), entry->getTimestamp()); } private: DocumentIdsAndTimeStamps & _to_remove; @@ -202,6 +203,28 @@ AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker:: return tracker; } +void +AsyncHandler::on_delete_bucket_complete(const document::Bucket& bucket) const { + StorBucketDatabase& db(_env.getBucketDatabase(bucket.getBucketSpace())); + StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(), "onDeleteBucket"); + if (entry.exists() && entry->getMetaCount() > 0) { + LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely " + "active operation when delete bucket was queued. " + "Updating bucket database to keep it in sync with file. " + "Cannot delete bucket from bucket database at this " + "point, as it can have been intentionally recreated " + "after delete bucket had been sent", + bucket.getBucketId().toString().c_str()); + api::BucketInfo info(0, 0, 0); + // Only set document counts/size; retain ready/active state. + info.setReady(entry->getBucketInfo().isReady()); + info.setActive(entry->getBucketInfo().isActive()); + + entry->setBucketInfo(info); + entry.write(); + } +} + MessageTracker::UP AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const { @@ -216,33 +239,84 @@ AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker:: return tracker; } - auto task = makeResultTask([this, tracker = std::move(tracker), bucket=cmd.getBucket()](spi::Result::UP ignored) { + auto task = makeResultTask([this, tracker = std::move(tracker), bucket = cmd.getBucket()]([[maybe_unused]] spi::Result::UP ignored) { // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric - (void) ignored; - StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace())); - StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(), "onDeleteBucket"); - if (entry.exists() && entry->getMetaCount() > 0) { - LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely " - "active operation when delete bucket was queued. " - "Updating bucket database to keep it in sync with file. " - "Cannot delete bucket from bucket database at this " - "point, as it can have been intentionally recreated " - "after delete bucket had been sent", - bucket.getBucketId().toString().c_str()); - api::BucketInfo info(0, 0, 0); - // Only set document counts/size; retain ready/active state. - info.setReady(entry->getBucketInfo().isReady()); - info.setActive(entry->getBucketInfo().isActive()); - - entry->setBucketInfo(info); - entry.write(); - } + on_delete_bucket_complete(bucket); tracker->sendReply(); }); _spi.deleteBucketAsync(bucket, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); return tracker; } +namespace { + +class GatherBucketMetadata : public BucketProcessor::EntryProcessor { + std::vector<std::unique_ptr<spi::DocEntry>>& _entries; +public: + explicit GatherBucketMetadata(std::vector<std::unique_ptr<spi::DocEntry>>& entries) noexcept + : _entries(entries) + { + } + ~GatherBucketMetadata() override = default; + + void process(std::unique_ptr<spi::DocEntry> entry) override { + _entries.emplace_back(std::move(entry)); + } +}; + +} + +MessageTracker::UP +AsyncHandler::handle_delete_bucket_throttling(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.deleteBuckets); + LOG(debug, "DeleteBucket(%s) (with throttling)", cmd.getBucketId().toString().c_str()); + if (_env._fileStorHandler.isMerging(cmd.getBucket())) { + _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), + api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); + } + spi::Bucket spi_bucket(cmd.getBucket()); + if (!checkProviderBucketInfoMatches(spi_bucket, cmd.getBucketInfo())) { + return tracker; + } + + std::vector<std::unique_ptr<spi::DocEntry>> meta_entries; + { + GatherBucketMetadata meta_proc(meta_entries); + auto usage = vespalib::CpuUsage::use(CpuUsage::Category::READ); + // Note: we only explicitly remove Put entries; tombstones are expected to be + // cheap and will be purged as part of the subsequent DeleteBucket operation. + // (Additionally, the SPI does not expose a way to remove a remove...) + BucketProcessor::iterateAll(_spi, spi_bucket, "true", + std::make_shared<document::NoFields>(), + meta_proc, spi::NEWEST_DOCUMENT_ONLY, tracker->context()); + } + auto invoke_delete_on_zero_refs = vespalib::makeSharedLambdaCallback([this, spi_bucket, bucket = cmd.getBucket(), tracker = std::move(tracker)]() mutable { + LOG(debug, "%s: about to invoke deleteBucketAsync", bucket.toString().c_str()); + auto task = makeResultTask([this, tracker = std::move(tracker), bucket]([[maybe_unused]] spi::Result::UP ignored) { + LOG(debug, "%s: deleteBucket callback invoked; sending reply", bucket.toString().c_str()); + on_delete_bucket_complete(bucket); + tracker->sendReply(); + }); + _spi.deleteBucketAsync(spi_bucket, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket.getBucketId(), std::move(task))); + }); + auto& throttler = _env._fileStorHandler.operation_throttler(); + for (auto& meta : meta_entries) { + auto token = throttler.blocking_acquire_one(); + std::vector<spi::DocTypeGidAndTimestamp> to_remove = {{meta->getDocumentType(), meta->getGid(), meta->getTimestamp()}}; + auto task = makeResultTask([bucket = cmd.getBucket(), token = std::move(token), invoke_delete_on_zero_refs]([[maybe_unused]] spi::Result::UP ignored) { + LOG(spam, "%s: completed removeByGidAsync operation", bucket.toString().c_str()); + // Nothing else clever to do here. Throttle token and deleteBucket dispatch refs dropped implicitly. + }); + LOG(spam, "%s: about to invoke removeByGidAsync(%s, %s, %zu)", cmd.getBucket().toString().c_str(), + vespalib::string(meta->getDocumentType()).c_str(), meta->getGid().toString().c_str(), meta->getTimestamp().getValue()); + _spi.removeByGidAsync(spi_bucket, std::move(to_remove), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); + } + // Actual bucket deletion happens when all remove ops have ACKed and dropped their refs to the destructor-invoked + // deleteBucket dispatcher. Note: this works transparently when the bucket is empty (no refs; happens immediately). + return tracker; +} + MessageTracker::UP AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP trackerUP) const { diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index 5ff20c6b9c5..1433176036b 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -24,22 +24,24 @@ class MessageTracker; class AsyncHandler { using MessageTrackerUP = std::unique_ptr<MessageTracker>; public: - AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, BucketOwnershipNotifier &, - vespalib::ISequencedTaskExecutor & executor, const document::BucketIdFactory & bucketIdFactory); + AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, BucketOwnershipNotifier&, + vespalib::ISequencedTaskExecutor& executor, const document::BucketIdFactory& bucketIdFactory); MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handle_delete_bucket_throttling(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrackerUP tracker) const; - static bool is_async_unconditional_message(const api::StorageMessage & cmd) noexcept; + static bool is_async_unconditional_message(const api::StorageMessage& cmd) noexcept; private: - bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; - static bool tasConditionExists(const api::TestAndSetCommand & cmd); - bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, - spi::Context & context, bool missingDocumentImpliesMatch = false) const; + [[nodiscard]] bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; + static bool tasConditionExists(const api::TestAndSetCommand& cmd); + bool tasConditionMatches(const api::TestAndSetCommand& cmd, MessageTracker& tracker, + spi::Context& context, bool missingDocumentImpliesMatch = false) const; + void on_delete_bucket_complete(const document::Bucket& bucket) const; const PersistenceUtil & _env; spi::PersistenceProvider & _spi; BucketOwnershipNotifier & _bucketOwnershipNotifier; diff --git a/storage/src/vespa/storage/persistence/bucketprocessor.cpp b/storage/src/vespa/storage/persistence/bucketprocessor.cpp index 99654b7c16a..84df361d2e1 100644 --- a/storage/src/vespa/storage/persistence/bucketprocessor.cpp +++ b/storage/src/vespa/storage/persistence/bucketprocessor.cpp @@ -2,6 +2,7 @@ #include "bucketprocessor.h" #include <vespa/document/fieldset/fieldsets.h> +#include <vespa/persistence/spi/docentry.h> #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/vespalib/stllike/asciistream.h> #include <cassert> @@ -68,8 +69,8 @@ BucketProcessor::iterateAll(spi::PersistenceProvider& provider, throw std::runtime_error(ss.str()); } - for (size_t i = 0; i < result.getEntries().size(); ++i) { - processor.process(*result.getEntries()[i]); + for (auto& entry : result.steal_entries()) { + processor.process(std::move(entry)); } if (result.isCompleted()) { diff --git a/storage/src/vespa/storage/persistence/bucketprocessor.h b/storage/src/vespa/storage/persistence/bucketprocessor.h index 3c2383967c8..d295566e0b1 100644 --- a/storage/src/vespa/storage/persistence/bucketprocessor.h +++ b/storage/src/vespa/storage/persistence/bucketprocessor.h @@ -23,8 +23,8 @@ class BucketProcessor public: class EntryProcessor { public: - virtual ~EntryProcessor() {}; - virtual void process(spi::DocEntry&) = 0; + virtual ~EntryProcessor() = default; + virtual void process(std::unique_ptr<spi::DocEntry>) = 0; }; static void iterateAll(spi::PersistenceProvider&, diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp index afab5391e2d..6eb327f021e 100644 --- a/storage/src/vespa/storage/persistence/processallhandler.cpp +++ b/storage/src/vespa/storage/persistence/processallhandler.cpp @@ -29,7 +29,8 @@ public: explicit StatEntryProcessor(std::ostream& o) : ost(o) {}; - void process(spi::DocEntry& e) override { + void process(std::unique_ptr<spi::DocEntry> entry) override { + const auto& e = *entry; ost << " Timestamp: " << e.getTimestamp() << ", "; if (e.getDocument() != nullptr) { ost << "Doc(" << e.getDocument()->getId() << ")" diff --git a/storage/src/vespa/storage/persistence/splitbitdetector.cpp b/storage/src/vespa/storage/persistence/splitbitdetector.cpp index 0a287258356..2d338be967a 100644 --- a/storage/src/vespa/storage/persistence/splitbitdetector.cpp +++ b/storage/src/vespa/storage/persistence/splitbitdetector.cpp @@ -56,17 +56,17 @@ struct BucketVisitor : public BucketProcessor::EntryProcessor { }; std::vector<DocInfo> _firstDocs; - BucketVisitor(const document::BucketIdFactory& factory); - ~BucketVisitor(); + explicit BucketVisitor(const document::BucketIdFactory& factory); + ~BucketVisitor() override; - void process(spi::DocEntry& entry) override { - assert(entry.getDocumentId()); + void process(std::unique_ptr<spi::DocEntry> entry) override { + assert(entry->getDocumentId()); ++_docCount; - const document::DocumentId& id(*entry.getDocumentId()); + const document::DocumentId& id(*entry->getDocumentId()); document::BucketId bucket = _factory.getBucketId(id); if (_firstDocs.size() < keepFirstCount) { - _firstDocs.push_back(DocInfo(entry.getTimestamp(), id, bucket)); + _firstDocs.emplace_back(entry->getTimestamp(), id, bucket); } if (_refBucket.getRawId() == 0) { @@ -83,8 +83,6 @@ struct BucketVisitor : public BucketProcessor::EntryProcessor { _conflictId = id; _conflictBucket = bucket; } - - return; } void printEntrySummary(std::ostream& out) { |