summaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/tests/common/metricstest.cpp11
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp8
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp33
-rw-r--r--storage/src/vespa/storage/distributor/distributor_bucket_space_repo.cpp12
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp21
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp16
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormetrics.h19
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp60
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h1
9 files changed, 102 insertions, 79 deletions
diff --git a/storage/src/tests/common/metricstest.cpp b/storage/src/tests/common/metricstest.cpp
index 61332bb9ad6..899c1979e86 100644
--- a/storage/src/tests/common/metricstest.cpp
+++ b/storage/src/tests/common/metricstest.cpp
@@ -122,9 +122,6 @@ void MetricsTest::createFakeLoad()
metrics.docs.inc(10 * n);
metrics.bytes.inc(10240 * n);
}
- _filestorMetrics->directoryEvents.inc(5);
- _filestorMetrics->partitionEvents.inc(4);
- _filestorMetrics->diskEvents.inc(3);
{
FileStorMetrics& disk(*_filestorMetrics);
disk.queueSize.addValue(4 * n);
@@ -147,18 +144,10 @@ void MetricsTest::createFakeLoad()
thread.update.notFound.inc(1 * n);
thread.update.latencyRead.addValue(2 * n);
thread.update.latency.addValue(7 * n);
- thread.revert.count.inc(2 * n);
- thread.revert.notFound.inc(n / 2);
- thread.revert.latency.addValue(2 * n);
thread.visit.count.inc(6 * n);
thread.deleteBuckets.count.inc(1 * n);
- thread.repairs.count.inc(3 * n);
- thread.repairFixed.inc(1 * n);
thread.splitBuckets.count.inc(20 * n);
- thread.movedBuckets.count.inc(1 * n);
- thread.readBucketInfo.count.inc(2 * n);
- thread.internalJoin.count.inc(3 * n);
thread.mergeBuckets.count.inc(2 * n);
thread.getBucketDiff.count.inc(4 * n);
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 96618eb9206..4911ad88692 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -1421,6 +1421,14 @@ void FileStorManagerTest::do_test_delete_bucket(bool use_throttled_delete) {
StorBucketDatabase::WrappedEntry entry(_node->getStorageBucketDatabase().get(bid, "foo"));
EXPECT_FALSE(entry.exists());
}
+ if (use_throttled_delete) {
+ auto& metrics = thread_metrics_of(*c.manager)->remove_by_gid;
+ EXPECT_EQ(metrics.failed.getValue(), 0);
+ EXPECT_EQ(metrics.count.getValue(), 1);
+ // We can't reliably test the actual latency here without wiring mock clock bumping into
+ // the async remove by GID execution, but we can at least test that we updated the metric.
+ EXPECT_EQ(metrics.latency.getCount(), 1);
+ }
}
// TODO remove once throttled behavior is the default
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 6f80ffe0727..a480ba2740f 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1603,6 +1603,39 @@ TEST_F(MergeThrottlerTest, queued_merges_are_not_counted_towards_memory_usage) {
EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
}
+TEST_F(MergeThrottlerTest, enqueued_merge_not_started_if_insufficient_memory_available) {
+ // See `queued_merges_are_not_counted_towards_memory_usage` test for magic number rationale
+ const auto max_pending = throttler_max_merges_pending(0);
+ ASSERT_LT(max_pending, 1000);
+ ASSERT_GT(max_pending, 1);
+ throttler(0).set_max_merge_memory_usage_bytes_locking(10_Mi);
+
+ // Fill up entire active window and enqueue a single merge
+ fill_throttler_queue_with_n_commands(0, 0);
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1000)).nodes(0, 1, 2).unordered(true).memory_usage(11_Mi).create());
+ waitUntilMergeQueueIs(throttler(0), 1, _messageWaitTime); // Should end up in queue
+
+ // Drain all active merges. As long as we have other active merges, the enqueued merge should not
+ // be allowed through since it's too large. Eventually it will hit the "at least one merge must
+ // be allowed at any time regardless of size" exception and is dequeued.
+ for (uint32_t i = 0; i < max_pending; ++i) {
+ auto fwd_cmd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ auto fwd_reply = dynamic_cast<api::MergeBucketCommand&>(*fwd_cmd).makeReply();
+
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ std::shared_ptr<api::StorageReply>(std::move(fwd_reply)),
+ MessageType::MERGEBUCKET_REPLY, ReturnCode::OK)); // Unwind reply for completed merge
+
+ if (i < max_pending - 1) {
+ // Merge should still be in the queue, as it requires 11 MiB, and we only have 10 MiB.
+ // It will eventually be executed when the window is empty (see below).
+ waitUntilMergeQueueIs(throttler(0), 1, _messageWaitTime);
+ }
+ }
+ // We've freed up the entire send window, so the over-sized merge can finally squeeze through.
+ waitUntilMergeQueueIs(throttler(0), 0, _messageWaitTime);
+}
+
namespace {
vespalib::HwInfo make_mem_info(uint64_t mem_size) {
diff --git a/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.cpp b/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.cpp
index d5d4accd70a..e6de5449b60 100644
--- a/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.cpp
@@ -5,7 +5,9 @@
#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/vespalib/util/backtrace.h>
#include <cassert>
+#include <cinttypes>
#include <vespa/log/log.h>
LOG_SETUP(".distributor.distributor_bucket_space_repo");
@@ -34,7 +36,10 @@ DistributorBucketSpace &
DistributorBucketSpaceRepo::get(BucketSpace bucketSpace)
{
auto itr = _map.find(bucketSpace);
- assert(itr != _map.end());
+ if (itr == _map.end()) [[unlikely]] {
+ LOG(error, "Bucket space %" PRIu64 " does not have a valid mapping. %s", bucketSpace.getId(), vespalib::getStackTrace(0).c_str());
+ abort();
+ }
return *itr->second;
}
@@ -42,7 +47,10 @@ const DistributorBucketSpace &
DistributorBucketSpaceRepo::get(BucketSpace bucketSpace) const
{
auto itr = _map.find(bucketSpace);
- assert(itr != _map.end());
+ if (itr == _map.end()) [[unlikely]] {
+ LOG(error, "Bucket space %" PRIu64 " does not have a valid mapping. %s", bucketSpace.getId(), vespalib::getStackTrace(0).c_str());
+ abort();
+ }
return *itr->second;
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index bb3952c8c33..a69f9e55afb 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -300,15 +300,26 @@ AsyncHandler::handle_delete_bucket_throttling(api::DeleteBucketCommand& cmd, Mes
});
_spi.deleteBucketAsync(spi_bucket, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket.getBucketId(), std::move(task)));
});
+
auto& throttler = _env._fileStorHandler.operation_throttler();
+ auto* remove_by_gid_metric = &_env._metrics.remove_by_gid;
+
for (auto& meta : meta_entries) {
auto token = throttler.blocking_acquire_one();
+ remove_by_gid_metric->count.inc();
std::vector<spi::DocTypeGidAndTimestamp> to_remove = {{meta->getDocumentType(), meta->getGid(), meta->getTimestamp()}};
- auto task = makeResultTask([bucket = cmd.getBucket(), token = std::move(token), invoke_delete_on_zero_refs]([[maybe_unused]] spi::Result::UP ignored) {
- LOG(spam, "%s: completed removeByGidAsync operation", bucket.toString().c_str());
- // Nothing else clever to do here. Throttle token and deleteBucket dispatch refs dropped implicitly.
- });
- LOG(spam, "%s: about to invoke removeByGidAsync(%s, %s, %zu)", cmd.getBucket().toString().c_str(),
+ auto task = makeResultTask([bucket = cmd.getBucket(), token = std::move(token),
+ invoke_delete_on_zero_refs, remove_by_gid_metric,
+ op_timer = framework::MilliSecTimer(_env._component.getClock())]
+ (spi::Result::UP result) {
+ if (result->hasError()) {
+ remove_by_gid_metric->failed.inc();
+ }
+ remove_by_gid_metric->latency.addValue(op_timer.getElapsedTimeAsDouble());
+ LOG(spam, "%s: completed removeByGidAsync operation", bucket.toString().c_str());
+ // Nothing else clever to do here. Throttle token and deleteBucket dispatch refs dropped implicitly.
+ });
+ LOG(spam, "%s: about to invoke removeByGidAsync(%s, %s, %" PRIu64 ")", cmd.getBucket().toString().c_str(),
vespalib::string(meta->getDocumentType()).c_str(), meta->getGid().toString().c_str(), meta->getTimestamp().getValue());
_spi.removeByGidAsync(spi_bucket, std::move(to_remove), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
index 6072a1dbc3e..9ed15605fb2 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
@@ -154,13 +154,11 @@ FileStorThreadMetrics::FileStorThreadMetrics(const std::string& name, const std:
removeLocation("remove_location", "Remove location", this),
statBucket("stat_bucket", "Stat bucket", this),
update(this),
- revert("revert", "Revert", this),
createIterator("createiterator", {}, this),
visit(this),
createBuckets("createbuckets", "Number of buckets that has been created.", this),
deleteBuckets("deletebuckets", "Number of buckets that has been deleted.", this),
- repairs("bucketverified", "Number of times buckets have been checked.", this),
- repairFixed("bucketfixed", {}, "Number of times bucket has been fixed because of corruption", this),
+ remove_by_gid("remove_by_gid", "Internal single-document remove operations used by DeleteBucket", this),
recheckBucketInfo("recheckbucketinfo",
"Number of times bucket info has been explicitly "
"rechecked due to buckets being marked modified by "
@@ -169,11 +167,6 @@ FileStorThreadMetrics::FileStorThreadMetrics(const std::string& name, const std:
splitBuckets("splitbuckets", "Number of times buckets have been split.", this),
joinBuckets("joinbuckets", "Number of times buckets have been joined.", this),
setBucketStates("setbucketstates", "Number of times buckets have been activated or deactivated.", this),
- movedBuckets("movedbuckets", "Number of buckets moved between disks", this),
- readBucketList("readbucketlist", "Number of read bucket list requests", this),
- readBucketInfo("readbucketinfo", "Number of read bucket info requests", this),
- internalJoin("internaljoin", "Number of joins to join buckets on multiple disks during "
- "storage initialization.", this),
mergeBuckets("mergebuckets", "Number of times buckets have been merged.", this),
getBucketDiff("getbucketdiff", "Number of getbucketdiff commands that have been processed.", this),
applyBucketDiff("applybucketdiff", "Number of applybucketdiff commands that have been processed.", this),
@@ -211,18 +204,11 @@ FileStorMetrics::FileStorMetrics()
throttle_window_size("throttle_window_size", {}, "Current size of async operation throttler window size", this),
throttle_waiting_threads("throttle_waiting_threads", {}, "Number of threads waiting to acquire a throttle token", this),
throttle_active_tokens("throttle_active_tokens", {}, "Current number of active throttle tokens", this),
- waitingForLockHitRate("waitingforlockrate", {},
- "Amount of times a filestor thread has needed to wait for "
- "lock to take next message in queue.", this),
active_operations(this),
- directoryEvents("directoryevents", {}, "Number of directory events received.", this),
- partitionEvents("partitionevents", {}, "Number of partition events received.", this),
- diskEvents("diskevents", {}, "Number of disk events received.", this),
bucket_db_init_latency("bucket_db_init_latency", {}, "Time taken (in ms) to initialize bucket databases with "
"information from the persistence provider", this)
{
pendingMerges.unsetOnZeroValue();
- waitingForLockHitRate.unsetOnZeroValue();
}
FileStorMetrics::~FileStorMetrics() = default;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
index 85a3813c9eb..75182705323 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
@@ -1,11 +1,6 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
- * @class storage::FileStorMetrics
- * @ingroup filestorage
- *
- * @brief Metrics for the file store threads.
- *
- * @version $Id$
+ * Metrics for the peristence threads.
*/
#pragma once
@@ -98,21 +93,15 @@ struct FileStorThreadMetrics : public metrics::MetricSet
Op removeLocation;
Op statBucket;
Update update;
- OpWithNotFound revert;
Op createIterator;
Visitor visit;
Op createBuckets;
Op deleteBuckets;
- Op repairs;
- metrics::LongCountMetric repairFixed;
+ Op remove_by_gid;
Op recheckBucketInfo;
Op splitBuckets;
Op joinBuckets;
Op setBucketStates;
- Op movedBuckets;
- Op readBucketList;
- Op readBucketInfo;
- Op internalJoin;
Op mergeBuckets;
Op getBucketDiff;
Op applyBucketDiff;
@@ -148,11 +137,7 @@ struct FileStorMetrics : public metrics::MetricSet
metrics::LongAverageMetric throttle_window_size;
metrics::LongAverageMetric throttle_waiting_threads;
metrics::LongAverageMetric throttle_active_tokens;
- metrics::DoubleAverageMetric waitingForLockHitRate;
ActiveOperationsMetrics active_operations;
- metrics::LongCountMetric directoryEvents;
- metrics::LongCountMetric partitionEvents;
- metrics::LongCountMetric diskEvents;
metrics::LongAverageMetric bucket_db_init_latency;
FileStorMetrics();
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 54a4ddbc780..b99c238f9ab 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -419,6 +419,13 @@ MergeThrottler::getNextQueuedMerge()
return entry._msg;
}
+const api::MergeBucketCommand&
+MergeThrottler::peek_merge_queue() const noexcept
+{
+ assert(!_queue.empty());
+ return dynamic_cast<const api::MergeBucketCommand&>(*_queue.begin()->_msg);
+}
+
void
MergeThrottler::enqueue_merge_for_later_processing(
const api::StorageMessage::SP& msg,
@@ -549,45 +556,40 @@ MergeThrottler::rejectOutdatedQueuedMerges(
// If there's a merge queued and the throttling policy allows for
// the merge to be processed, do so.
bool
-MergeThrottler::attemptProcessNextQueuedMerge(
- MessageGuard& msgGuard)
+MergeThrottler::attemptProcessNextQueuedMerge(MessageGuard& msgGuard)
{
- if (!canProcessNewMerge()) {
+ if (_queue.empty()) {
+ return false;
+ }
+ if ( ! (canProcessNewMerge() && accepting_merge_is_within_memory_limits(peek_merge_queue()))) {
// Should never reach a non-sending state when there are
// no to-be-replied merges that can trigger a new processing
assert(!_merges.empty());
return false;
}
+ // If we get here, there must be something to dequeue.
api::StorageMessage::SP msg = getNextQueuedMerge();
- if (msg) {
- // In case of resends and whatnot, it's possible for a merge
- // command to be in the queue while another higher priority
- // command for the same bucket sneaks in front of it and gets
- // a slot. Send BUSY in this case to make the distributor retry
- // later, at which point the existing merge has hopefully gone
- // through and the new one will be effectively a no-op to perform
- if (!isMergeAlreadyKnown(msg)) {
- LOG(spam, "Processing queued merge %s", msg->toString().c_str());
- processNewMergeCommand(msg, msgGuard);
- } else {
- vespalib::asciistream oss;
- oss << "Queued merge " << msg->toString() << " is out of date; it has already "
- "been started by someone else since it was queued";
- LOG(debug, "%s", oss.c_str());
- sendReply(dynamic_cast<const api::MergeBucketCommand&>(*msg),
- api::ReturnCode(api::ReturnCode::BUSY, oss.str()),
- msgGuard, _metrics->chaining);
- }
- return true;
+ assert(msg);
+ // In case of resends and whatnot, it's possible for a merge
+ // command to be in the queue while another higher priority
+ // command for the same bucket sneaks in front of it and gets
+ // a slot. Send BUSY in this case to make the distributor retry
+ // later, at which point the existing merge has hopefully gone
+ // through and the new one will be effectively a no-op to perform
+ if (!isMergeAlreadyKnown(msg)) {
+ LOG(spam, "Processing queued merge %s", msg->toString().c_str());
+ processNewMergeCommand(msg, msgGuard);
} else {
- if (_queue.empty()) {
- LOG(spam, "Queue empty - no merges to process");
- } else {
- LOG(spam, "Merges queued, but throttle policy disallows further merges at this time");
- }
+ vespalib::asciistream oss;
+ oss << "Queued merge " << msg->toString() << " is out of date; it has already "
+ "been started by someone else since it was queued";
+ LOG(debug, "%s", oss.c_str());
+ sendReply(dynamic_cast<const api::MergeBucketCommand&>(*msg),
+ api::ReturnCode(api::ReturnCode::BUSY, oss.str()),
+ msgGuard, _metrics->chaining);
}
- return false;
+ return true;
}
bool
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index a5559c159bf..e210a8bfb8b 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -372,6 +372,7 @@ private:
[[nodiscard]] bool allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) const noexcept;
[[nodiscard]] bool accepting_merge_is_within_memory_limits(const api::MergeBucketCommand& cmd) const noexcept;
[[nodiscard]] bool may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept;
+ [[nodiscard]] const api::MergeBucketCommand& peek_merge_queue() const noexcept;
void sendReply(const api::MergeBucketCommand& cmd,
const api::ReturnCode& result,