summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2023-09-05 13:14:04 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2023-09-05 13:19:57 +0000
commit7b9ba34cd00ab2b9416fc55f1eef31a7fc43f717 (patch)
tree1f15ccb31d1bfdc8574e58fde742e3e856f25d2b /storage
parent58a82398bcfe66c119faced12bf68cdfc6407536 (diff)
Enumerate pending message IDs on a bucket predicate basis
Lets a caller selectively enumerate all IDs of messages pending towards buckets that match a caller-provided predicate function. A separate message ID callback is invoked per distinct message. Also remove hard-coded multi-index numeric indices in favor of named constants.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/pendingmessagetrackertest.cpp125
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.cpp50
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.h42
3 files changed, 155 insertions, 62 deletions
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
index 8277281206d..35dc072b953 100644
--- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp
+++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
@@ -1,19 +1,24 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <tests/common/dummystoragelink.h>
#include <vespa/document/base/testdocman.h>
-#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/document/test/make_bucket_space.h>
+#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h>
-#include <tests/common/dummystoragelink.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/util/lambdatask.h>
-#include <vespa/vespalib/gtest/gtest.h>
#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+#include <functional>
+#include <vector>
+using document::Bucket;
+using document::BucketId;
using document::test::makeDocumentBucket;
using document::test::makeBucketSpace;
using namespace ::testing;
@@ -23,8 +28,7 @@ namespace storage::distributor {
using namespace std::chrono_literals;
struct PendingMessageTrackerTest : Test {
- void insertMessages(PendingMessageTracker& tracker);
-
+ std::vector<std::shared_ptr<api::StorageCommand>> insertMessages(PendingMessageTracker& tracker);
};
namespace {
@@ -33,11 +37,7 @@ class RequestBuilder {
uint16_t _toNode;
std::chrono::milliseconds _atTime;
public:
- RequestBuilder()
- : _toNode(0),
- _atTime()
- {
- }
+ RequestBuilder() noexcept : _toNode(0), _atTime() {}
RequestBuilder& atTime(std::chrono::milliseconds t) {
_atTime = t;
@@ -59,14 +59,12 @@ makeStorageAddress(uint16_t node) {
return {&_storage, lib::NodeType::STORAGE, node};
}
-class Fixture
-{
- StorageComponentRegisterImpl _compReg;
+class Fixture {
+ StorageComponentRegisterImpl _compReg;
framework::defaultimplementation::FakeClock _clock;
- std::unique_ptr<PendingMessageTracker> _tracker;
- document::TestDocMan _testDocMan;
+ std::unique_ptr<PendingMessageTracker> _tracker;
+ document::TestDocMan _testDocMan;
public:
-
Fixture();
~Fixture();
@@ -110,21 +108,26 @@ public:
PendingMessageTracker& tracker() { return *_tracker; }
auto& clock() { return _clock; }
+ std::vector<uint64_t> enumerate_msg_ids(const std::function<bool(const document::Bucket&)>& bucket_predicate) const {
+ std::vector<uint64_t> enumerated_ids;
+ auto insert_enumerated_ids = [&](uint64_t msg_id) { enumerated_ids.emplace_back(msg_id); };
+
+ _tracker->enumerate_matching_pending_bucket_ops(bucket_predicate, insert_enumerated_ids);
+ return enumerated_ids;
+ }
+
private:
- std::string createDummyIdString(const document::BucketId& bucket) const {
+ static std::string createDummyIdString(const document::BucketId& bucket) {
std::ostringstream id;
id << "id:foo:testdoctype1:n=" << bucket.getId() << ":foo";
return id.str();
}
- document::Document::SP createDummyDocumentForBucket(const document::BucketId& bucket) const
- {
+ document::Document::SP createDummyDocumentForBucket(const document::BucketId& bucket) const {
return _testDocMan.createDocument("foobar", createDummyIdString(bucket));
}
- std::shared_ptr<api::RemoveCommand> createRemoveToNode(
- uint16_t node) const
- {
+ std::shared_ptr<api::RemoveCommand> createRemoveToNode(uint16_t node) const {
document::BucketId bucket(16, 1234);
auto cmd = std::make_shared<api::RemoveCommand>(
makeDocumentBucket(bucket),
@@ -195,9 +198,10 @@ TEST_F(PendingMessageTrackerTest, simple) {
}
}
-void
+std::vector<std::shared_ptr<api::StorageCommand>>
PendingMessageTrackerTest::insertMessages(PendingMessageTracker& tracker)
{
+ std::vector<std::shared_ptr<api::StorageCommand>> inserted;
for (uint32_t i = 0; i < 4; i++) {
std::ostringstream ost;
ost << "id:footype:testdoc:n=1234:" << i;
@@ -206,15 +210,19 @@ PendingMessageTrackerTest::insertMessages(PendingMessageTracker& tracker)
document::DocumentId(ost.str()), 1000 + i);
remove->setAddress(makeStorageAddress(i % 2));
tracker.insert(remove);
+ inserted.emplace_back(std::move(remove));
}
for (uint32_t i = 0; i < 4; i++) {
std::ostringstream ost;
ost << "id:footype:testdoc:n=4567:" << i;
- auto remove = std::make_shared<api::RemoveCommand>(makeDocumentBucket(document::BucketId(16, 4567)), document::DocumentId(ost.str()), 2000 + i);
+ auto remove = std::make_shared<api::RemoveCommand>(makeDocumentBucket(document::BucketId(16, 4567)),
+ document::DocumentId(ost.str()), 2000 + i);
remove->setAddress(makeStorageAddress(i % 2));
tracker.insert(remove);
+ inserted.emplace_back(std::move(remove));
}
+ return inserted;
}
TEST_F(PendingMessageTrackerTest, start_page) {
@@ -291,15 +299,13 @@ TEST_F(PendingMessageTrackerTest, multiple_messages) {
namespace {
-class TestChecker : public PendingMessageTracker::Checker
-{
+class TestChecker : public PendingMessageTracker::Checker {
public:
uint8_t pri;
- TestChecker() : pri(UINT8_MAX) {}
+ TestChecker() noexcept : pri(UINT8_MAX) {}
- bool check(uint32_t msgType, uint16_t node, uint8_t p) override {
- (void) node;
+ bool check(uint32_t msgType, [[maybe_unused]] uint16_t node, uint8_t p) override {
if (msgType == api::MessageType::REMOVE_ID) {
pri = p;
return false;
@@ -309,7 +315,6 @@ public:
}
};
-
}
TEST_F(PendingMessageTrackerTest, get_pending_message_types) {
@@ -370,12 +375,10 @@ TEST_F(PendingMessageTrackerTest, has_pending_message) {
namespace {
-class OperationEnumerator : public PendingMessageTracker::Checker
-{
+class OperationEnumerator : public PendingMessageTracker::Checker {
std::ostringstream ss;
public:
- bool check(uint32_t msgType, uint16_t node, uint8_t p) override {
- (void) p;
+ bool check(uint32_t msgType, uint16_t node, [[maybe_unused]] uint8_t p) override {
ss << api::MessageType::get(static_cast<api::MessageType::Id>(msgType)).getName()
<< " -> " << node << "\n";
@@ -574,4 +577,58 @@ TEST_F(PendingMessageTrackerTest, request_bucket_info_with_bucket_tracked_with_s
}
}
+namespace {
+
+auto bucket_id_eq_fn(BucketId matched_id) {
+ return [matched_id](auto& bucket) noexcept {
+ return bucket.getBucketId() == matched_id;
+ };
+}
+
+auto bucket_eq_fn(Bucket matched_bucket) {
+ return [matched_bucket](auto& bucket) noexcept {
+ return bucket == matched_bucket;
+ };
+}
+
+}
+
+TEST_F(PendingMessageTrackerTest, can_enumerate_all_message_ids_for_ops_to_matching_buckets) {
+ Fixture f;
+ BucketId bucket_id_a(16, 1234);
+ BucketId bucket_id_b(16, 4567);
+ // This inserts 4 ops for bucket {16, 1234} (bucket 'a') and 4 ops for {16, 4567} (bucket 'b')
+ auto inserted_ops = insertMessages(f.tracker());
+ std::vector<uint64_t> bucket_a_msgs_ids, bucket_b_msgs_ids;
+ for (auto& op : inserted_ops) {
+ if (op->getBucketId() == bucket_id_a) {
+ bucket_a_msgs_ids.emplace_back(op->getMsgId());
+ } else {
+ bucket_b_msgs_ids.emplace_back(op->getMsgId());
+ }
+ }
+
+ // Match all for bucket 'a'
+ EXPECT_THAT(f.enumerate_msg_ids(bucket_id_eq_fn(bucket_id_a)), UnorderedElementsAreArray(bucket_a_msgs_ids));
+ // Match all for bucket 'b'
+ EXPECT_THAT(f.enumerate_msg_ids(bucket_id_eq_fn(bucket_id_b)), UnorderedElementsAreArray(bucket_b_msgs_ids));
+ // Match no buckets
+ EXPECT_THAT(f.enumerate_msg_ids(bucket_id_eq_fn(BucketId(16, 7890))), IsEmpty());
+
+ const auto default_space = document::FixedBucketSpaces::default_space();
+ const auto global_space = document::FixedBucketSpaces::global_space();
+
+ // Message to global bucket space (the former messages were all in the default space)
+ auto global_msg = std::make_shared<api::RequestBucketInfoCommand>(global_space, std::vector<BucketId>({bucket_id_a}));
+ global_msg->setAddress(makeStorageAddress(3));
+ f.tracker().insert(global_msg);
+
+ // Default space has the expected existing entries
+ EXPECT_THAT(f.enumerate_msg_ids(bucket_eq_fn(Bucket(default_space, bucket_id_a))),
+ UnorderedElementsAreArray(bucket_a_msgs_ids));
+ // Global space has only 1 entry
+ EXPECT_THAT(f.enumerate_msg_ids(bucket_eq_fn(Bucket(global_space, bucket_id_a))),
+ ElementsAre(global_msg->getMsgId()));
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
index 7b3cdacf702..c32b3b83c05 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
@@ -40,7 +40,8 @@ PendingMessageTracker::MessageEntry::toHtml() const {
vespalib::asciistream ss;
ss << "<li><i>Node " << nodeIdx << "</i>: "
<< "<b>" << vespalib::to_string(timeStamp) << "</b> "
- << api::MessageType::get(api::MessageType::Id(msgType)).getName() << "(" << bucket.getBucketId() << ", priority=" << priority << ")</li>\n";
+ << api::MessageType::get(api::MessageType::Id(msgType)).getName()
+ << "(" << bucket.getBucketId() << ", priority=" << priority << ")</li>\n";
return ss.str();
}
@@ -83,7 +84,7 @@ std::vector<uint64_t>
PendingMessageTracker::clearMessagesForNode(uint16_t node)
{
std::lock_guard guard(_lock);
- MessagesByNodeAndBucket& idx(boost::multi_index::get<1>(_messages));
+ auto& idx = boost::multi_index::get<IndexByNodeAndBucket>(_messages);
auto range = pairAsRange(idx.equal_range(boost::make_tuple(node)));
std::vector<uint64_t> erasedIds;
@@ -97,6 +98,27 @@ PendingMessageTracker::clearMessagesForNode(uint16_t node)
}
void
+PendingMessageTracker::enumerate_matching_pending_bucket_ops(
+ const std::function<bool(const document::Bucket&)>& bucket_predicate,
+ const std::function<void(uint64_t)>& msg_id_callback) const
+{
+ std::lock_guard guard(_lock);
+ const auto& idx = boost::multi_index::get<IndexByBucketAndType>(_messages);
+ auto iter = idx.begin();
+ const auto last = idx.end();
+ while (iter != last) {
+ const auto check_bucket = iter->bucket;
+ const bool match = bucket_predicate(check_bucket);
+ do {
+ if (match) {
+ msg_id_callback(iter->msgId);
+ }
+ ++iter;
+ } while ((iter != last) && (iter->bucket == check_bucket));
+ }
+}
+
+void
PendingMessageTracker::insert(const std::shared_ptr<api::StorageMessage>& msg)
{
if (msg->getAddress()) {
@@ -126,8 +148,8 @@ PendingMessageTracker::reply(const api::StorageReply& r)
uint64_t msgId = r.getMsgId();
std::unique_lock guard(_lock);
- MessagesByMsgId& msgs = boost::multi_index::get<0>(_messages);
- MessagesByMsgId::iterator iter = msgs.find(msgId);
+ auto& msgs = boost::multi_index::get<IndexByMessageId>(_messages);
+ auto iter = msgs.find(msgId);
if (iter != msgs.end()) {
bucket = iter->bucket;
_nodeInfo.decPending(r.getAddress()->getIndex());
@@ -184,7 +206,7 @@ bool range_is_empty_or_only_has_read_ops(const Range& range) noexcept {
bool
PendingMessageTracker::bucket_has_no_pending_write_ops(const document::Bucket& bucket) const noexcept
{
- auto& bucket_idx = boost::multi_index::get<2>(_messages);
+ auto& bucket_idx = boost::multi_index::get<IndexByBucketAndType>(_messages);
auto pending_tasks_for_bucket = bucket_idx.equal_range(bucket);
return range_is_empty_or_only_has_read_ops(pending_tasks_for_bucket);
}
@@ -243,30 +265,30 @@ runCheckerOnRange(PendingMessageTracker::Checker& checker, const Range& range)
}
void
-PendingMessageTracker::checkPendingMessages(uint16_t node, const document::Bucket &bucket, Checker& checker) const
+PendingMessageTracker::checkPendingMessages(uint16_t node, const document::Bucket& bucket, Checker& checker) const
{
std::lock_guard guard(_lock);
- const MessagesByNodeAndBucket& msgs(boost::multi_index::get<1>(_messages));
+ const auto& msgs = boost::multi_index::get<IndexByNodeAndBucket>(_messages);
auto range = pairAsRange(msgs.equal_range(boost::make_tuple(node, bucket)));
runCheckerOnRange(checker, range);
}
void
-PendingMessageTracker::checkPendingMessages(const document::Bucket &bucket, Checker& checker) const
+PendingMessageTracker::checkPendingMessages(const document::Bucket& bucket, Checker& checker) const
{
std::lock_guard guard(_lock);
- const MessagesByBucketAndType& msgs(boost::multi_index::get<2>(_messages));
+ const auto& msgs = boost::multi_index::get<IndexByBucketAndType>(_messages);
auto range = pairAsRange(msgs.equal_range(boost::make_tuple(bucket)));
runCheckerOnRange(checker, range);
}
bool
-PendingMessageTracker::hasPendingMessage(uint16_t node, const document::Bucket &bucket, uint32_t messageType) const
+PendingMessageTracker::hasPendingMessage(uint16_t node, const document::Bucket& bucket, uint32_t messageType) const
{
std::lock_guard guard(_lock);
- const MessagesByNodeAndBucket& msgs(boost::multi_index::get<1>(_messages));
+ const auto& msgs = boost::multi_index::get<IndexByNodeAndBucket>(_messages);
auto range = msgs.equal_range(boost::make_tuple(node, bucket, messageType));
return (range.first != range.second);
@@ -283,7 +305,7 @@ void
PendingMessageTracker::getStatusPerBucket(std::ostream& out) const
{
std::lock_guard guard(_lock);
- const MessagesByNodeAndBucket& msgs = boost::multi_index::get<1>(_messages);
+ const auto& msgs = boost::multi_index::get<IndexByNodeAndBucket>(_messages);
using BucketMap = std::map<document::Bucket, std::vector<vespalib::string>>;
BucketMap perBucketMsgs;
for (const auto& msg : msgs) {
@@ -312,9 +334,9 @@ void
PendingMessageTracker::getStatusPerNode(std::ostream& out) const
{
std::lock_guard guard(_lock);
- const MessagesByNodeAndBucket& msgs = boost::multi_index::get<1>(_messages);
+ const auto& msgs = boost::multi_index::get<IndexByNodeAndBucket>(_messages);
int lastNode = -1;
- for (const auto & node : msgs) {
+ for (const auto& node : msgs) {
if (node.nodeIdx != lastNode) {
if (lastNode != -1) {
out << "</ul>\n";
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
index 4b5655d3f3c..736f2918401 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
@@ -2,22 +2,23 @@
#pragma once
#include "nodeinfo.h"
-#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
-#include <vespa/storageframework/generic/component/componentregister.h>
-#include <vespa/storageframework/generic/component/component.h>
#include <vespa/storageapi/message/bucket.h>
+#include <vespa/storageframework/generic/component/component.h>
+#include <vespa/storageframework/generic/component/componentregister.h>
+#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
#include <vespa/vespalib/stllike/hash_set.h>
-#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/composite_key.hpp>
#include <boost/multi_index/identity.hpp>
-#include <boost/multi_index/member.hpp>
#include <boost/multi_index/mem_fun.hpp>
+#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/sequenced_index.hpp>
-#include <boost/multi_index/composite_key.hpp>
-#include <set>
-#include <unordered_map>
+#include <boost/multi_index_container.hpp>
#include <chrono>
+#include <functional>
#include <mutex>
+#include <set>
+#include <unordered_map>
namespace storage::distributor {
@@ -84,20 +85,20 @@ public:
* passing it to the given type checker.
* Breaks when the checker returns false.
*/
- void checkPendingMessages(uint16_t node, const document::Bucket &bucket, Checker& checker) const;
+ void checkPendingMessages(uint16_t node, const document::Bucket& bucket, Checker& checker) const;
/**
* Goes through each pending message (across all nodes) for the given bucket
* and invokes the given checker with the node, message type and priority.
* Breaks when the checker returns false.
*/
- void checkPendingMessages(const document::Bucket &bucket, Checker& checker) const;
+ void checkPendingMessages(const document::Bucket& bucket, Checker& checker) const;
/**
* Utility function for checking if there's a message of type
* messageType pending to bucket bid on the given node.
*/
- bool hasPendingMessage(uint16_t node, const document::Bucket &bucket, uint32_t messageType) const;
+ bool hasPendingMessage(uint16_t node, const document::Bucket& bucket, uint32_t messageType) const;
/**
* Returns a vector containing the number of pending messages to each storage node.
@@ -119,6 +120,17 @@ public:
void run_once_no_pending_for_bucket(const document::Bucket& bucket, std::unique_ptr<DeferredTask> task);
void abort_deferred_tasks();
+
+ /**
+ * For each distinct bucket with at least one pending message towards it:
+ *
+ * Iff `bucket_predicate(bucket) == true`, `msg_id_callback` is invoked once for _each_
+ * message towards `bucket`, with the message ID as the argument.
+ *
+ * Note: `bucket_predicate` is only invoked once per distinct bucket.
+ */
+ void enumerate_matching_pending_bucket_ops(const std::function<bool(const document::Bucket&)>& bucket_predicate,
+ const std::function<void(uint64_t)>& msg_id_callback) const;
private:
struct MessageEntry {
TimePoint timeStamp;
@@ -169,9 +181,11 @@ private:
>
>;
- using MessagesByMsgId = Messages::nth_index<0>::type;
- using MessagesByNodeAndBucket = Messages::nth_index<1>::type;
- using MessagesByBucketAndType = Messages::nth_index<2>::type;
+ // Must match Messages::nth_index<N>
+ static constexpr uint32_t IndexByMessageId = 0;
+ static constexpr uint32_t IndexByNodeAndBucket = 1;
+ static constexpr uint32_t IndexByBucketAndType = 2;
+
using DeferredBucketTaskMap = std::unordered_multimap<
document::Bucket,
std::unique_ptr<DeferredTask>,