summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-11-12 18:40:54 +0100
committerGitHub <noreply@github.com>2023-11-12 18:40:54 +0100
commitd89ef46334c9b6771d18822e23eec1a657952b01 (patch)
treee78c51e047aed99b6e91835f9bafc0965a8030fd
parent7dc297a448d472b6a825d21cc80249807b7350ad (diff)
parentf288e1bf9028f418e1be5568829acdceda720e89 (diff)
Merge pull request #29310 from vespa-engine/vekterli/throttled-delete-bucket
Implement `DeleteBucket` with throttled per-document async removal
-rw-r--r--configdefinitions/src/vespa/stor-filestor.def8
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp36
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp120
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h16
-rw-r--r--storage/src/vespa/storage/persistence/bucketprocessor.cpp5
-rw-r--r--storage/src/vespa/storage/persistence/bucketprocessor.h4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp5
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp19
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.h4
-rw-r--r--storage/src/vespa/storage/persistence/processallhandler.cpp3
-rw-r--r--storage/src/vespa/storage/persistence/splitbitdetector.cpp14
11 files changed, 182 insertions, 52 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def
index c7ac1472d30..090f74dec12 100644
--- a/configdefinitions/src/vespa/stor-filestor.def
+++ b/configdefinitions/src/vespa/stor-filestor.def
@@ -114,3 +114,11 @@ async_operation_throttler_type enum { UNLIMITED, DYNAMIC } default=DYNAMIC
## Only applies if async_operation_throttler_type == DYNAMIC.
## DEPRECATED! use the async_operation_throttler struct instead
async_operation_dynamic_throttling_window_increment int default=20 restart
+
+## If set, DeleteBucket operations are internally expanded to an individually persistence-
+## throttled remove per document stored in the bucket. This makes the cost model of
+## executing a DeleteBucket symmetrical with feeding the documents to the bucket in the
+## first place.
+##
+## This is a live config.
+use_per_document_throttled_delete_bucket bool default=false
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 2b0218bf20c..96618eb9206 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -45,6 +45,8 @@ using namespace storage::api;
using storage::spi::test::makeSpiBucket;
using document::test::makeDocumentBucket;
using vespalib::IDestructorCallback;
+using vespa::config::content::StorFilestorConfig;
+using vespa::config::content::StorFilestorConfigBuilder;
using namespace ::testing;
#define ASSERT_SINGLE_REPLY(replytype, reply, link, time) \
@@ -224,7 +226,6 @@ struct TestFileStorComponents {
explicit TestFileStorComponents(FileStorTestBase& test, bool use_small_config = false)
: manager(nullptr)
{
- using vespa::config::content::StorFilestorConfig;
auto config_uri = config::ConfigUri((use_small_config ? test.smallConfig : test.config)->getConfigId());
auto config = config_from<StorFilestorConfig>(config_uri);
auto fsm = std::make_unique<FileStorManager>(*config, test._node->getPersistenceProvider(),
@@ -275,7 +276,7 @@ struct PersistenceHandlerComponents : public FileStorHandlerComponents {
bucketOwnershipNotifier(component, messageSender),
persistenceHandler()
{
- vespa::config::content::StorFilestorConfig cfg;
+ StorFilestorConfig cfg;
persistenceHandler =
std::make_unique<PersistenceHandler>(executor, component, cfg,
test._node->getPersistenceProvider(),
@@ -310,7 +311,7 @@ FileStorTestBase::TearDown()
}
struct FileStorManagerTest : public FileStorTestBase {
-
+ void do_test_delete_bucket(bool use_throttled_delete);
};
TEST_F(FileStorManagerTest, header_only_put) {
@@ -743,7 +744,7 @@ TEST_F(FileStorManagerTest, priority) {
ServiceLayerComponent component(_node->getComponentRegister(), "test");
BucketOwnershipNotifier bucketOwnershipNotifier(component, c.messageSender);
- vespa::config::content::StorFilestorConfig cfg;
+ StorFilestorConfig cfg;
PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(),
filestorHandler, bucketOwnershipNotifier, *metrics.threads[0]);
std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component));
@@ -1370,8 +1371,14 @@ TEST_F(FileStorManagerTest, remove_location) {
}
}
-TEST_F(FileStorManagerTest, delete_bucket) {
+void FileStorManagerTest::do_test_delete_bucket(bool use_throttled_delete) {
TestFileStorComponents c(*this);
+
+ auto config_uri = config::ConfigUri(config->getConfigId());
+ StorFilestorConfigBuilder my_config(*config_from<StorFilestorConfig>(config_uri));
+ my_config.usePerDocumentThrottledDeleteBucket = use_throttled_delete;
+ c.manager->on_configure(my_config);
+
auto& top = c.top;
// Creating a document to test with
document::DocumentId docId("id:crawler:testdoctype1:n=4000:http://www.ntnu.no/");
@@ -1409,6 +1416,20 @@ TEST_F(FileStorManagerTest, delete_bucket) {
ASSERT_TRUE(reply.get());
EXPECT_EQ(ReturnCode(ReturnCode::OK), reply->getResult());
}
+ // Bucket should be removed from DB
+ {
+ StorBucketDatabase::WrappedEntry entry(_node->getStorageBucketDatabase().get(bid, "foo"));
+ EXPECT_FALSE(entry.exists());
+ }
+}
+
+// TODO remove once throttled behavior is the default
+TEST_F(FileStorManagerTest, delete_bucket_legacy) {
+ do_test_delete_bucket(false);
+}
+
+TEST_F(FileStorManagerTest, delete_bucket_throttled) {
+ do_test_delete_bucket(true);
}
TEST_F(FileStorManagerTest, delete_bucket_rejects_outdated_bucket_info) {
@@ -1452,6 +1473,11 @@ TEST_F(FileStorManagerTest, delete_bucket_rejects_outdated_bucket_info) {
EXPECT_EQ(ReturnCode::REJECTED, reply->getResult().getResult());
EXPECT_EQ(bucketInfo, reply->getBucketInfo());
}
+ // Bucket should still exist in DB
+ {
+ StorBucketDatabase::WrappedEntry entry(_node->getStorageBucketDatabase().get(bid, "foo"));
+ EXPECT_TRUE(entry.exists());
+ }
}
/**
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 2e7d0caf151..bb3952c8c33 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -7,6 +7,7 @@
#include "bucketprocessor.h"
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/persistence/spi/docentry.h>
+#include <vespa/persistence/spi/doctype_gid_and_timestamp.h>
#include <vespa/persistence/spi/catchresult.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/document/update/documentupdate.h>
@@ -71,7 +72,7 @@ private:
template<class FunctionType>
std::unique_ptr<ResultTask>
-makeResultTask(FunctionType &&function) {
+makeResultTask(FunctionType&& function) {
return std::make_unique<LambdaResultTask<std::decay_t<FunctionType>>>
(std::forward<FunctionType>(function));
}
@@ -119,8 +120,8 @@ public:
: _to_remove(to_remove)
{}
- void process(spi::DocEntry& entry) override {
- _to_remove.emplace_back(*entry.getDocumentId(), entry.getTimestamp());
+ void process(std::unique_ptr<spi::DocEntry> entry) override {
+ _to_remove.emplace_back(*entry->getDocumentId(), entry->getTimestamp());
}
private:
DocumentIdsAndTimeStamps & _to_remove;
@@ -202,6 +203,28 @@ AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::
return tracker;
}
+void
+AsyncHandler::on_delete_bucket_complete(const document::Bucket& bucket) const {
+ StorBucketDatabase& db(_env.getBucketDatabase(bucket.getBucketSpace()));
+ StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(), "onDeleteBucket");
+ if (entry.exists() && entry->getMetaCount() > 0) {
+ LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
+ "active operation when delete bucket was queued. "
+ "Updating bucket database to keep it in sync with file. "
+ "Cannot delete bucket from bucket database at this "
+ "point, as it can have been intentionally recreated "
+ "after delete bucket had been sent",
+ bucket.getBucketId().toString().c_str());
+ api::BucketInfo info(0, 0, 0);
+ // Only set document counts/size; retain ready/active state.
+ info.setReady(entry->getBucketInfo().isReady());
+ info.setActive(entry->getBucketInfo().isActive());
+
+ entry->setBucketInfo(info);
+ entry.write();
+ }
+}
+
MessageTracker::UP
AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
{
@@ -216,33 +239,84 @@ AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::
return tracker;
}
- auto task = makeResultTask([this, tracker = std::move(tracker), bucket=cmd.getBucket()](spi::Result::UP ignored) {
+ auto task = makeResultTask([this, tracker = std::move(tracker), bucket = cmd.getBucket()]([[maybe_unused]] spi::Result::UP ignored) {
// TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric
- (void) ignored;
- StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace()));
- StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(), "onDeleteBucket");
- if (entry.exists() && entry->getMetaCount() > 0) {
- LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
- "active operation when delete bucket was queued. "
- "Updating bucket database to keep it in sync with file. "
- "Cannot delete bucket from bucket database at this "
- "point, as it can have been intentionally recreated "
- "after delete bucket had been sent",
- bucket.getBucketId().toString().c_str());
- api::BucketInfo info(0, 0, 0);
- // Only set document counts/size; retain ready/active state.
- info.setReady(entry->getBucketInfo().isReady());
- info.setActive(entry->getBucketInfo().isActive());
-
- entry->setBucketInfo(info);
- entry.write();
- }
+ on_delete_bucket_complete(bucket);
tracker->sendReply();
});
_spi.deleteBucketAsync(bucket, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
return tracker;
}
+namespace {
+
+class GatherBucketMetadata : public BucketProcessor::EntryProcessor {
+ std::vector<std::unique_ptr<spi::DocEntry>>& _entries;
+public:
+ explicit GatherBucketMetadata(std::vector<std::unique_ptr<spi::DocEntry>>& entries) noexcept
+ : _entries(entries)
+ {
+ }
+ ~GatherBucketMetadata() override = default;
+
+ void process(std::unique_ptr<spi::DocEntry> entry) override {
+ _entries.emplace_back(std::move(entry));
+ }
+};
+
+}
+
+MessageTracker::UP
+AsyncHandler::handle_delete_bucket_throttling(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.deleteBuckets);
+ LOG(debug, "DeleteBucket(%s) (with throttling)", cmd.getBucketId().toString().c_str());
+ if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
+ _env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
+ api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge"));
+ }
+ spi::Bucket spi_bucket(cmd.getBucket());
+ if (!checkProviderBucketInfoMatches(spi_bucket, cmd.getBucketInfo())) {
+ return tracker;
+ }
+
+ std::vector<std::unique_ptr<spi::DocEntry>> meta_entries;
+ {
+ GatherBucketMetadata meta_proc(meta_entries);
+ auto usage = vespalib::CpuUsage::use(CpuUsage::Category::READ);
+ // Note: we only explicitly remove Put entries; tombstones are expected to be
+ // cheap and will be purged as part of the subsequent DeleteBucket operation.
+ // (Additionally, the SPI does not expose a way to remove a remove...)
+ BucketProcessor::iterateAll(_spi, spi_bucket, "true",
+ std::make_shared<document::NoFields>(),
+ meta_proc, spi::NEWEST_DOCUMENT_ONLY, tracker->context());
+ }
+ auto invoke_delete_on_zero_refs = vespalib::makeSharedLambdaCallback([this, spi_bucket, bucket = cmd.getBucket(), tracker = std::move(tracker)]() mutable {
+ LOG(debug, "%s: about to invoke deleteBucketAsync", bucket.toString().c_str());
+ auto task = makeResultTask([this, tracker = std::move(tracker), bucket]([[maybe_unused]] spi::Result::UP ignored) {
+ LOG(debug, "%s: deleteBucket callback invoked; sending reply", bucket.toString().c_str());
+ on_delete_bucket_complete(bucket);
+ tracker->sendReply();
+ });
+ _spi.deleteBucketAsync(spi_bucket, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket.getBucketId(), std::move(task)));
+ });
+ auto& throttler = _env._fileStorHandler.operation_throttler();
+ for (auto& meta : meta_entries) {
+ auto token = throttler.blocking_acquire_one();
+ std::vector<spi::DocTypeGidAndTimestamp> to_remove = {{meta->getDocumentType(), meta->getGid(), meta->getTimestamp()}};
+ auto task = makeResultTask([bucket = cmd.getBucket(), token = std::move(token), invoke_delete_on_zero_refs]([[maybe_unused]] spi::Result::UP ignored) {
+ LOG(spam, "%s: completed removeByGidAsync operation", bucket.toString().c_str());
+ // Nothing else clever to do here. Throttle token and deleteBucket dispatch refs dropped implicitly.
+ });
+ LOG(spam, "%s: about to invoke removeByGidAsync(%s, %s, %zu)", cmd.getBucket().toString().c_str(),
+ vespalib::string(meta->getDocumentType()).c_str(), meta->getGid().toString().c_str(), meta->getTimestamp().getValue());
+ _spi.removeByGidAsync(spi_bucket, std::move(to_remove), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
+ }
+ // Actual bucket deletion happens when all remove ops have ACKed and dropped their refs to the destructor-invoked
+ // deleteBucket dispatcher. Note: this works transparently when the bucket is empty (no refs; happens immediately).
+ return tracker;
+}
+
MessageTracker::UP
AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP trackerUP) const
{
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index 5ff20c6b9c5..1433176036b 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -24,22 +24,24 @@ class MessageTracker;
class AsyncHandler {
using MessageTrackerUP = std::unique_ptr<MessageTracker>;
public:
- AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, BucketOwnershipNotifier &,
- vespalib::ISequencedTaskExecutor & executor, const document::BucketIdFactory & bucketIdFactory);
+ AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, BucketOwnershipNotifier&,
+ vespalib::ISequencedTaskExecutor& executor, const document::BucketIdFactory& bucketIdFactory);
MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handle_delete_bucket_throttling(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrackerUP tracker) const;
- static bool is_async_unconditional_message(const api::StorageMessage & cmd) noexcept;
+ static bool is_async_unconditional_message(const api::StorageMessage& cmd) noexcept;
private:
- bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
- static bool tasConditionExists(const api::TestAndSetCommand & cmd);
- bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
- spi::Context & context, bool missingDocumentImpliesMatch = false) const;
+ [[nodiscard]] bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
+ static bool tasConditionExists(const api::TestAndSetCommand& cmd);
+ bool tasConditionMatches(const api::TestAndSetCommand& cmd, MessageTracker& tracker,
+ spi::Context& context, bool missingDocumentImpliesMatch = false) const;
+ void on_delete_bucket_complete(const document::Bucket& bucket) const;
const PersistenceUtil & _env;
spi::PersistenceProvider & _spi;
BucketOwnershipNotifier & _bucketOwnershipNotifier;
diff --git a/storage/src/vespa/storage/persistence/bucketprocessor.cpp b/storage/src/vespa/storage/persistence/bucketprocessor.cpp
index 99654b7c16a..84df361d2e1 100644
--- a/storage/src/vespa/storage/persistence/bucketprocessor.cpp
+++ b/storage/src/vespa/storage/persistence/bucketprocessor.cpp
@@ -2,6 +2,7 @@
#include "bucketprocessor.h"
#include <vespa/document/fieldset/fieldsets.h>
+#include <vespa/persistence/spi/docentry.h>
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <cassert>
@@ -68,8 +69,8 @@ BucketProcessor::iterateAll(spi::PersistenceProvider& provider,
throw std::runtime_error(ss.str());
}
- for (size_t i = 0; i < result.getEntries().size(); ++i) {
- processor.process(*result.getEntries()[i]);
+ for (auto& entry : result.steal_entries()) {
+ processor.process(std::move(entry));
}
if (result.isCompleted()) {
diff --git a/storage/src/vespa/storage/persistence/bucketprocessor.h b/storage/src/vespa/storage/persistence/bucketprocessor.h
index 3c2383967c8..d295566e0b1 100644
--- a/storage/src/vespa/storage/persistence/bucketprocessor.h
+++ b/storage/src/vespa/storage/persistence/bucketprocessor.h
@@ -23,8 +23,8 @@ class BucketProcessor
public:
class EntryProcessor {
public:
- virtual ~EntryProcessor() {};
- virtual void process(spi::DocEntry&) = 0;
+ virtual ~EntryProcessor() = default;
+ virtual void process(std::unique_ptr<spi::DocEntry>) = 0;
};
static void iterateAll(spi::PersistenceProvider&,
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index e6cd8987c0a..61c7da6e286 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -230,7 +230,7 @@ FileStorManager::on_configure(const StorFilestorConfig& config)
numResponseThreads, 10000,
true, selectSequencer(_config->responseSequencerType));
assert(_sequencedExecutor);
- LOG(spam, "Setting up the disk");
+ LOG(spam, "Setting up %u persistence threads", numThreads);
for (uint32_t i = 0; i < numThreads; i++) {
_threads.push_back(std::make_unique<PersistenceThread>(createRegisteredHandler(_component),
*_filestorHandler, i % numStripes, _component));
@@ -241,13 +241,14 @@ FileStorManager::on_configure(const StorFilestorConfig& config)
auto updated_dyn_throttle_params = dynamic_throttle_params_from_config(config, _threads.size());
_filestorHandler->reconfigure_dynamic_throttler(updated_dyn_throttle_params);
}
- // TODO remove once desired dynamic throttling behavior is set in stone
+ // TODO remove once desired throttling behavior is set in stone
{
_filestorHandler->use_dynamic_operation_throttling(use_dynamic_throttling);
_filestorHandler->set_throttle_apply_bucket_diff_ops(!throttle_merge_feed_ops);
std::lock_guard guard(_lock);
for (auto& ph : _persistenceHandlers) {
ph->set_throttle_merge_feed_ops(throttle_merge_feed_ops);
+ ph->set_use_per_document_throttled_delete_bucket(config.usePerDocumentThrottledDeleteBucket);
}
}
}
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 33171296abf..78ad6eac0e2 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -24,7 +24,8 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen
cfg.commonMergeChainOptimalizationMinimumSize),
_asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()),
_splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization),
- _simpleHandler(_env, provider, component.getBucketIdFactory())
+ _simpleHandler(_env, provider, component.getBucketIdFactory()),
+ _use_per_op_throttled_delete_bucket(false)
{
}
@@ -68,7 +69,11 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
case api::MessageType::CREATEBUCKET_ID:
return _asyncHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
case api::MessageType::DELETEBUCKET_ID:
- return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
+ if (use_per_op_throttled_delete_bucket()) {
+ return _asyncHandler.handle_delete_bucket_throttling(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
+ } else {
+ return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
+ }
case api::MessageType::JOINBUCKETS_ID:
return _splitJoinHandler.handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker));
case api::MessageType::SPLITBUCKET_ID:
@@ -181,4 +186,14 @@ PersistenceHandler::set_throttle_merge_feed_ops(bool throttle) noexcept
_mergeHandler.set_throttle_merge_feed_ops(throttle);
}
+bool
+PersistenceHandler::use_per_op_throttled_delete_bucket() const noexcept {
+ return _use_per_op_throttled_delete_bucket.load(std::memory_order_relaxed);
+}
+
+void
+PersistenceHandler::set_use_per_document_throttled_delete_bucket(bool throttle) noexcept {
+ _use_per_op_throttled_delete_bucket.store(throttle, std::memory_order_relaxed);
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h
index d7ebec85549..9639b772a28 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.h
+++ b/storage/src/vespa/storage/persistence/persistencehandler.h
@@ -11,6 +11,7 @@
#include <vespa/storage/common/storagecomponent.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/config-stor-filestor.h>
+#include <atomic>
namespace storage {
@@ -37,12 +38,14 @@ public:
const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; }
void set_throttle_merge_feed_ops(bool throttle) noexcept;
+ void set_use_per_document_throttled_delete_bucket(bool throttle) noexcept;
private:
// Message handling functions
MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const;
MessageTracker::UP handleReply(api::StorageReply&, MessageTracker::UP) const;
MessageTracker::UP processMessage(api::StorageMessage& msg, MessageTracker::UP tracker) const;
+ [[nodiscard]] bool use_per_op_throttled_delete_bucket() const noexcept;
const framework::Clock & _clock;
PersistenceUtil _env;
@@ -51,6 +54,7 @@ private:
AsyncHandler _asyncHandler;
SplitJoinHandler _splitJoinHandler;
SimpleMessageHandler _simpleHandler;
+ std::atomic<bool> _use_per_op_throttled_delete_bucket;
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp
index afab5391e2d..6eb327f021e 100644
--- a/storage/src/vespa/storage/persistence/processallhandler.cpp
+++ b/storage/src/vespa/storage/persistence/processallhandler.cpp
@@ -29,7 +29,8 @@ public:
explicit StatEntryProcessor(std::ostream& o)
: ost(o) {};
- void process(spi::DocEntry& e) override {
+ void process(std::unique_ptr<spi::DocEntry> entry) override {
+ const auto& e = *entry;
ost << " Timestamp: " << e.getTimestamp() << ", ";
if (e.getDocument() != nullptr) {
ost << "Doc(" << e.getDocument()->getId() << ")"
diff --git a/storage/src/vespa/storage/persistence/splitbitdetector.cpp b/storage/src/vespa/storage/persistence/splitbitdetector.cpp
index 0a287258356..2d338be967a 100644
--- a/storage/src/vespa/storage/persistence/splitbitdetector.cpp
+++ b/storage/src/vespa/storage/persistence/splitbitdetector.cpp
@@ -56,17 +56,17 @@ struct BucketVisitor : public BucketProcessor::EntryProcessor {
};
std::vector<DocInfo> _firstDocs;
- BucketVisitor(const document::BucketIdFactory& factory);
- ~BucketVisitor();
+ explicit BucketVisitor(const document::BucketIdFactory& factory);
+ ~BucketVisitor() override;
- void process(spi::DocEntry& entry) override {
- assert(entry.getDocumentId());
+ void process(std::unique_ptr<spi::DocEntry> entry) override {
+ assert(entry->getDocumentId());
++_docCount;
- const document::DocumentId& id(*entry.getDocumentId());
+ const document::DocumentId& id(*entry->getDocumentId());
document::BucketId bucket = _factory.getBucketId(id);
if (_firstDocs.size() < keepFirstCount) {
- _firstDocs.push_back(DocInfo(entry.getTimestamp(), id, bucket));
+ _firstDocs.emplace_back(entry->getTimestamp(), id, bucket);
}
if (_refBucket.getRawId() == 0) {
@@ -83,8 +83,6 @@ struct BucketVisitor : public BucketProcessor::EntryProcessor {
_conflictId = id;
_conflictBucket = bucket;
}
-
- return;
}
void printEntrySummary(std::ostream& out) {