diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-11-12 18:40:54 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-12 18:40:54 +0100 |
commit | d89ef46334c9b6771d18822e23eec1a657952b01 (patch) | |
tree | e78c51e047aed99b6e91835f9bafc0965a8030fd | |
parent | 7dc297a448d472b6a825d21cc80249807b7350ad (diff) | |
parent | f288e1bf9028f418e1be5568829acdceda720e89 (diff) |
Merge pull request #29310 from vespa-engine/vekterli/throttled-delete-bucket
Implement `DeleteBucket` with throttled per-document async removal
11 files changed, 182 insertions, 52 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def index c7ac1472d30..090f74dec12 100644 --- a/configdefinitions/src/vespa/stor-filestor.def +++ b/configdefinitions/src/vespa/stor-filestor.def @@ -114,3 +114,11 @@ async_operation_throttler_type enum { UNLIMITED, DYNAMIC } default=DYNAMIC ## Only applies if async_operation_throttler_type == DYNAMIC. ## DEPRECATED! use the async_operation_throttler struct instead async_operation_dynamic_throttling_window_increment int default=20 restart + +## If set, DeleteBucket operations are internally expanded to an individually persistence- +## throttled remove per document stored in the bucket. This makes the cost model of +## executing a DeleteBucket symmetrical with feeding the documents to the bucket in the +## first place. +## +## This is a live config. +use_per_document_throttled_delete_bucket bool default=false diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 2b0218bf20c..96618eb9206 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -45,6 +45,8 @@ using namespace storage::api; using storage::spi::test::makeSpiBucket; using document::test::makeDocumentBucket; using vespalib::IDestructorCallback; +using vespa::config::content::StorFilestorConfig; +using vespa::config::content::StorFilestorConfigBuilder; using namespace ::testing; #define ASSERT_SINGLE_REPLY(replytype, reply, link, time) \ @@ -224,7 +226,6 @@ struct TestFileStorComponents { explicit TestFileStorComponents(FileStorTestBase& test, bool use_small_config = false) : manager(nullptr) { - using vespa::config::content::StorFilestorConfig; auto config_uri = config::ConfigUri((use_small_config ? test.smallConfig : test.config)->getConfigId()); auto config = config_from<StorFilestorConfig>(config_uri); auto fsm = std::make_unique<FileStorManager>(*config, test._node->getPersistenceProvider(), @@ -275,7 +276,7 @@ struct PersistenceHandlerComponents : public FileStorHandlerComponents { bucketOwnershipNotifier(component, messageSender), persistenceHandler() { - vespa::config::content::StorFilestorConfig cfg; + StorFilestorConfig cfg; persistenceHandler = std::make_unique<PersistenceHandler>(executor, component, cfg, test._node->getPersistenceProvider(), @@ -310,7 +311,7 @@ FileStorTestBase::TearDown() } struct FileStorManagerTest : public FileStorTestBase { - + void do_test_delete_bucket(bool use_throttled_delete); }; TEST_F(FileStorManagerTest, header_only_put) { @@ -743,7 +744,7 @@ TEST_F(FileStorManagerTest, priority) { ServiceLayerComponent component(_node->getComponentRegister(), "test"); BucketOwnershipNotifier bucketOwnershipNotifier(component, c.messageSender); - vespa::config::content::StorFilestorConfig cfg; + StorFilestorConfig cfg; PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(), filestorHandler, bucketOwnershipNotifier, *metrics.threads[0]); std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component)); @@ -1370,8 +1371,14 @@ TEST_F(FileStorManagerTest, remove_location) { } } -TEST_F(FileStorManagerTest, delete_bucket) { +void FileStorManagerTest::do_test_delete_bucket(bool use_throttled_delete) { TestFileStorComponents c(*this); + + auto config_uri = config::ConfigUri(config->getConfigId()); + StorFilestorConfigBuilder my_config(*config_from<StorFilestorConfig>(config_uri)); + my_config.usePerDocumentThrottledDeleteBucket = use_throttled_delete; + c.manager->on_configure(my_config); + auto& top = c.top; // Creating a document to test with document::DocumentId docId("id:crawler:testdoctype1:n=4000:http://www.ntnu.no/"); @@ -1409,6 +1416,20 @@ TEST_F(FileStorManagerTest, delete_bucket) { ASSERT_TRUE(reply.get()); EXPECT_EQ(ReturnCode(ReturnCode::OK), reply->getResult()); } + // Bucket should be removed from DB + { + StorBucketDatabase::WrappedEntry entry(_node->getStorageBucketDatabase().get(bid, "foo")); + EXPECT_FALSE(entry.exists()); + } +} + +// TODO remove once throttled behavior is the default +TEST_F(FileStorManagerTest, delete_bucket_legacy) { + do_test_delete_bucket(false); +} + +TEST_F(FileStorManagerTest, delete_bucket_throttled) { + do_test_delete_bucket(true); } TEST_F(FileStorManagerTest, delete_bucket_rejects_outdated_bucket_info) { @@ -1452,6 +1473,11 @@ TEST_F(FileStorManagerTest, delete_bucket_rejects_outdated_bucket_info) { EXPECT_EQ(ReturnCode::REJECTED, reply->getResult().getResult()); EXPECT_EQ(bucketInfo, reply->getBucketInfo()); } + // Bucket should still exist in DB + { + StorBucketDatabase::WrappedEntry entry(_node->getStorageBucketDatabase().get(bid, "foo")); + EXPECT_TRUE(entry.exists()); + } } /** diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 2e7d0caf151..bb3952c8c33 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -7,6 +7,7 @@ #include "bucketprocessor.h" #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/persistence/spi/docentry.h> +#include <vespa/persistence/spi/doctype_gid_and_timestamp.h> #include <vespa/persistence/spi/catchresult.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/document/update/documentupdate.h> @@ -71,7 +72,7 @@ private: template<class FunctionType> std::unique_ptr<ResultTask> -makeResultTask(FunctionType &&function) { +makeResultTask(FunctionType&& function) { return std::make_unique<LambdaResultTask<std::decay_t<FunctionType>>> (std::forward<FunctionType>(function)); } @@ -119,8 +120,8 @@ public: : _to_remove(to_remove) {} - void process(spi::DocEntry& entry) override { - _to_remove.emplace_back(*entry.getDocumentId(), entry.getTimestamp()); + void process(std::unique_ptr<spi::DocEntry> entry) override { + _to_remove.emplace_back(*entry->getDocumentId(), entry->getTimestamp()); } private: DocumentIdsAndTimeStamps & _to_remove; @@ -202,6 +203,28 @@ AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker:: return tracker; } +void +AsyncHandler::on_delete_bucket_complete(const document::Bucket& bucket) const { + StorBucketDatabase& db(_env.getBucketDatabase(bucket.getBucketSpace())); + StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(), "onDeleteBucket"); + if (entry.exists() && entry->getMetaCount() > 0) { + LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely " + "active operation when delete bucket was queued. " + "Updating bucket database to keep it in sync with file. " + "Cannot delete bucket from bucket database at this " + "point, as it can have been intentionally recreated " + "after delete bucket had been sent", + bucket.getBucketId().toString().c_str()); + api::BucketInfo info(0, 0, 0); + // Only set document counts/size; retain ready/active state. + info.setReady(entry->getBucketInfo().isReady()); + info.setActive(entry->getBucketInfo().isActive()); + + entry->setBucketInfo(info); + entry.write(); + } +} + MessageTracker::UP AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const { @@ -216,33 +239,84 @@ AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker:: return tracker; } - auto task = makeResultTask([this, tracker = std::move(tracker), bucket=cmd.getBucket()](spi::Result::UP ignored) { + auto task = makeResultTask([this, tracker = std::move(tracker), bucket = cmd.getBucket()]([[maybe_unused]] spi::Result::UP ignored) { // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric - (void) ignored; - StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace())); - StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(), "onDeleteBucket"); - if (entry.exists() && entry->getMetaCount() > 0) { - LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely " - "active operation when delete bucket was queued. " - "Updating bucket database to keep it in sync with file. " - "Cannot delete bucket from bucket database at this " - "point, as it can have been intentionally recreated " - "after delete bucket had been sent", - bucket.getBucketId().toString().c_str()); - api::BucketInfo info(0, 0, 0); - // Only set document counts/size; retain ready/active state. - info.setReady(entry->getBucketInfo().isReady()); - info.setActive(entry->getBucketInfo().isActive()); - - entry->setBucketInfo(info); - entry.write(); - } + on_delete_bucket_complete(bucket); tracker->sendReply(); }); _spi.deleteBucketAsync(bucket, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); return tracker; } +namespace { + +class GatherBucketMetadata : public BucketProcessor::EntryProcessor { + std::vector<std::unique_ptr<spi::DocEntry>>& _entries; +public: + explicit GatherBucketMetadata(std::vector<std::unique_ptr<spi::DocEntry>>& entries) noexcept + : _entries(entries) + { + } + ~GatherBucketMetadata() override = default; + + void process(std::unique_ptr<spi::DocEntry> entry) override { + _entries.emplace_back(std::move(entry)); + } +}; + +} + +MessageTracker::UP +AsyncHandler::handle_delete_bucket_throttling(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.deleteBuckets); + LOG(debug, "DeleteBucket(%s) (with throttling)", cmd.getBucketId().toString().c_str()); + if (_env._fileStorHandler.isMerging(cmd.getBucket())) { + _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), + api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); + } + spi::Bucket spi_bucket(cmd.getBucket()); + if (!checkProviderBucketInfoMatches(spi_bucket, cmd.getBucketInfo())) { + return tracker; + } + + std::vector<std::unique_ptr<spi::DocEntry>> meta_entries; + { + GatherBucketMetadata meta_proc(meta_entries); + auto usage = vespalib::CpuUsage::use(CpuUsage::Category::READ); + // Note: we only explicitly remove Put entries; tombstones are expected to be + // cheap and will be purged as part of the subsequent DeleteBucket operation. + // (Additionally, the SPI does not expose a way to remove a remove...) + BucketProcessor::iterateAll(_spi, spi_bucket, "true", + std::make_shared<document::NoFields>(), + meta_proc, spi::NEWEST_DOCUMENT_ONLY, tracker->context()); + } + auto invoke_delete_on_zero_refs = vespalib::makeSharedLambdaCallback([this, spi_bucket, bucket = cmd.getBucket(), tracker = std::move(tracker)]() mutable { + LOG(debug, "%s: about to invoke deleteBucketAsync", bucket.toString().c_str()); + auto task = makeResultTask([this, tracker = std::move(tracker), bucket]([[maybe_unused]] spi::Result::UP ignored) { + LOG(debug, "%s: deleteBucket callback invoked; sending reply", bucket.toString().c_str()); + on_delete_bucket_complete(bucket); + tracker->sendReply(); + }); + _spi.deleteBucketAsync(spi_bucket, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket.getBucketId(), std::move(task))); + }); + auto& throttler = _env._fileStorHandler.operation_throttler(); + for (auto& meta : meta_entries) { + auto token = throttler.blocking_acquire_one(); + std::vector<spi::DocTypeGidAndTimestamp> to_remove = {{meta->getDocumentType(), meta->getGid(), meta->getTimestamp()}}; + auto task = makeResultTask([bucket = cmd.getBucket(), token = std::move(token), invoke_delete_on_zero_refs]([[maybe_unused]] spi::Result::UP ignored) { + LOG(spam, "%s: completed removeByGidAsync operation", bucket.toString().c_str()); + // Nothing else clever to do here. Throttle token and deleteBucket dispatch refs dropped implicitly. + }); + LOG(spam, "%s: about to invoke removeByGidAsync(%s, %s, %zu)", cmd.getBucket().toString().c_str(), + vespalib::string(meta->getDocumentType()).c_str(), meta->getGid().toString().c_str(), meta->getTimestamp().getValue()); + _spi.removeByGidAsync(spi_bucket, std::move(to_remove), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); + } + // Actual bucket deletion happens when all remove ops have ACKed and dropped their refs to the destructor-invoked + // deleteBucket dispatcher. Note: this works transparently when the bucket is empty (no refs; happens immediately). + return tracker; +} + MessageTracker::UP AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP trackerUP) const { diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index 5ff20c6b9c5..1433176036b 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -24,22 +24,24 @@ class MessageTracker; class AsyncHandler { using MessageTrackerUP = std::unique_ptr<MessageTracker>; public: - AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, BucketOwnershipNotifier &, - vespalib::ISequencedTaskExecutor & executor, const document::BucketIdFactory & bucketIdFactory); + AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, BucketOwnershipNotifier&, + vespalib::ISequencedTaskExecutor& executor, const document::BucketIdFactory& bucketIdFactory); MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handle_delete_bucket_throttling(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrackerUP tracker) const; - static bool is_async_unconditional_message(const api::StorageMessage & cmd) noexcept; + static bool is_async_unconditional_message(const api::StorageMessage& cmd) noexcept; private: - bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; - static bool tasConditionExists(const api::TestAndSetCommand & cmd); - bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, - spi::Context & context, bool missingDocumentImpliesMatch = false) const; + [[nodiscard]] bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; + static bool tasConditionExists(const api::TestAndSetCommand& cmd); + bool tasConditionMatches(const api::TestAndSetCommand& cmd, MessageTracker& tracker, + spi::Context& context, bool missingDocumentImpliesMatch = false) const; + void on_delete_bucket_complete(const document::Bucket& bucket) const; const PersistenceUtil & _env; spi::PersistenceProvider & _spi; BucketOwnershipNotifier & _bucketOwnershipNotifier; diff --git a/storage/src/vespa/storage/persistence/bucketprocessor.cpp b/storage/src/vespa/storage/persistence/bucketprocessor.cpp index 99654b7c16a..84df361d2e1 100644 --- a/storage/src/vespa/storage/persistence/bucketprocessor.cpp +++ b/storage/src/vespa/storage/persistence/bucketprocessor.cpp @@ -2,6 +2,7 @@ #include "bucketprocessor.h" #include <vespa/document/fieldset/fieldsets.h> +#include <vespa/persistence/spi/docentry.h> #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/vespalib/stllike/asciistream.h> #include <cassert> @@ -68,8 +69,8 @@ BucketProcessor::iterateAll(spi::PersistenceProvider& provider, throw std::runtime_error(ss.str()); } - for (size_t i = 0; i < result.getEntries().size(); ++i) { - processor.process(*result.getEntries()[i]); + for (auto& entry : result.steal_entries()) { + processor.process(std::move(entry)); } if (result.isCompleted()) { diff --git a/storage/src/vespa/storage/persistence/bucketprocessor.h b/storage/src/vespa/storage/persistence/bucketprocessor.h index 3c2383967c8..d295566e0b1 100644 --- a/storage/src/vespa/storage/persistence/bucketprocessor.h +++ b/storage/src/vespa/storage/persistence/bucketprocessor.h @@ -23,8 +23,8 @@ class BucketProcessor public: class EntryProcessor { public: - virtual ~EntryProcessor() {}; - virtual void process(spi::DocEntry&) = 0; + virtual ~EntryProcessor() = default; + virtual void process(std::unique_ptr<spi::DocEntry>) = 0; }; static void iterateAll(spi::PersistenceProvider&, diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index e6cd8987c0a..61c7da6e286 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -230,7 +230,7 @@ FileStorManager::on_configure(const StorFilestorConfig& config) numResponseThreads, 10000, true, selectSequencer(_config->responseSequencerType)); assert(_sequencedExecutor); - LOG(spam, "Setting up the disk"); + LOG(spam, "Setting up %u persistence threads", numThreads); for (uint32_t i = 0; i < numThreads; i++) { _threads.push_back(std::make_unique<PersistenceThread>(createRegisteredHandler(_component), *_filestorHandler, i % numStripes, _component)); @@ -241,13 +241,14 @@ FileStorManager::on_configure(const StorFilestorConfig& config) auto updated_dyn_throttle_params = dynamic_throttle_params_from_config(config, _threads.size()); _filestorHandler->reconfigure_dynamic_throttler(updated_dyn_throttle_params); } - // TODO remove once desired dynamic throttling behavior is set in stone + // TODO remove once desired throttling behavior is set in stone { _filestorHandler->use_dynamic_operation_throttling(use_dynamic_throttling); _filestorHandler->set_throttle_apply_bucket_diff_ops(!throttle_merge_feed_ops); std::lock_guard guard(_lock); for (auto& ph : _persistenceHandlers) { ph->set_throttle_merge_feed_ops(throttle_merge_feed_ops); + ph->set_use_per_document_throttled_delete_bucket(config.usePerDocumentThrottledDeleteBucket); } } } diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 33171296abf..78ad6eac0e2 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -24,7 +24,8 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen cfg.commonMergeChainOptimalizationMinimumSize), _asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()), _splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization), - _simpleHandler(_env, provider, component.getBucketIdFactory()) + _simpleHandler(_env, provider, component.getBucketIdFactory()), + _use_per_op_throttled_delete_bucket(false) { } @@ -68,7 +69,11 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr case api::MessageType::CREATEBUCKET_ID: return _asyncHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker)); case api::MessageType::DELETEBUCKET_ID: - return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); + if (use_per_op_throttled_delete_bucket()) { + return _asyncHandler.handle_delete_bucket_throttling(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); + } else { + return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); + } case api::MessageType::JOINBUCKETS_ID: return _splitJoinHandler.handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker)); case api::MessageType::SPLITBUCKET_ID: @@ -181,4 +186,14 @@ PersistenceHandler::set_throttle_merge_feed_ops(bool throttle) noexcept _mergeHandler.set_throttle_merge_feed_ops(throttle); } +bool +PersistenceHandler::use_per_op_throttled_delete_bucket() const noexcept { + return _use_per_op_throttled_delete_bucket.load(std::memory_order_relaxed); +} + +void +PersistenceHandler::set_use_per_document_throttled_delete_bucket(bool throttle) noexcept { + _use_per_op_throttled_delete_bucket.store(throttle, std::memory_order_relaxed); +} + } diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h index d7ebec85549..9639b772a28 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.h +++ b/storage/src/vespa/storage/persistence/persistencehandler.h @@ -11,6 +11,7 @@ #include <vespa/storage/common/storagecomponent.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/config-stor-filestor.h> +#include <atomic> namespace storage { @@ -37,12 +38,14 @@ public: const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; } void set_throttle_merge_feed_ops(bool throttle) noexcept; + void set_use_per_document_throttled_delete_bucket(bool throttle) noexcept; private: // Message handling functions MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const; MessageTracker::UP handleReply(api::StorageReply&, MessageTracker::UP) const; MessageTracker::UP processMessage(api::StorageMessage& msg, MessageTracker::UP tracker) const; + [[nodiscard]] bool use_per_op_throttled_delete_bucket() const noexcept; const framework::Clock & _clock; PersistenceUtil _env; @@ -51,6 +54,7 @@ private: AsyncHandler _asyncHandler; SplitJoinHandler _splitJoinHandler; SimpleMessageHandler _simpleHandler; + std::atomic<bool> _use_per_op_throttled_delete_bucket; }; } // storage diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp index afab5391e2d..6eb327f021e 100644 --- a/storage/src/vespa/storage/persistence/processallhandler.cpp +++ b/storage/src/vespa/storage/persistence/processallhandler.cpp @@ -29,7 +29,8 @@ public: explicit StatEntryProcessor(std::ostream& o) : ost(o) {}; - void process(spi::DocEntry& e) override { + void process(std::unique_ptr<spi::DocEntry> entry) override { + const auto& e = *entry; ost << " Timestamp: " << e.getTimestamp() << ", "; if (e.getDocument() != nullptr) { ost << "Doc(" << e.getDocument()->getId() << ")" diff --git a/storage/src/vespa/storage/persistence/splitbitdetector.cpp b/storage/src/vespa/storage/persistence/splitbitdetector.cpp index 0a287258356..2d338be967a 100644 --- a/storage/src/vespa/storage/persistence/splitbitdetector.cpp +++ b/storage/src/vespa/storage/persistence/splitbitdetector.cpp @@ -56,17 +56,17 @@ struct BucketVisitor : public BucketProcessor::EntryProcessor { }; std::vector<DocInfo> _firstDocs; - BucketVisitor(const document::BucketIdFactory& factory); - ~BucketVisitor(); + explicit BucketVisitor(const document::BucketIdFactory& factory); + ~BucketVisitor() override; - void process(spi::DocEntry& entry) override { - assert(entry.getDocumentId()); + void process(std::unique_ptr<spi::DocEntry> entry) override { + assert(entry->getDocumentId()); ++_docCount; - const document::DocumentId& id(*entry.getDocumentId()); + const document::DocumentId& id(*entry->getDocumentId()); document::BucketId bucket = _factory.getBucketId(id); if (_firstDocs.size() < keepFirstCount) { - _firstDocs.push_back(DocInfo(entry.getTimestamp(), id, bucket)); + _firstDocs.emplace_back(entry->getTimestamp(), id, bucket); } if (_refBucket.getRawId() == 0) { @@ -83,8 +83,6 @@ struct BucketVisitor : public BucketProcessor::EntryProcessor { _conflictId = id; _conflictBucket = bucket; } - - return; } void printEntrySummary(std::ostream& out) { |