aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-01-11 12:58:18 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-01-11 12:58:18 +0000
commit941b4d0a9d5729145f73d1f885b0ffbb9687f15e (patch)
tree6f14c1f3587320400833f98ffa1f3986c8721638 /storage
parent08708ba6add14d390c12b01805f4addc3df0507d (diff)
Move dummy mbus messages to shared header and use in MergeThrottler as well
Explicitly override `mbus::Message::getApproxSize()` to return 0 (instead of the default 1) to avoid mis-counting size between requests and responses. This has not mattered in practice since we haven't set any size-based limits (only message count limits), but we should ensure symmetry anyway.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/common/dummy_mbus_messages.h39
-rw-r--r--storage/src/vespa/storage/persistence/shared_operation_throttler.cpp25
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp25
3 files changed, 50 insertions, 39 deletions
diff --git a/storage/src/vespa/storage/common/dummy_mbus_messages.h b/storage/src/vespa/storage/common/dummy_mbus_messages.h
new file mode 100644
index 00000000000..1efaa6b654f
--- /dev/null
+++ b/storage/src/vespa/storage/common/dummy_mbus_messages.h
@@ -0,0 +1,39 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/messagebus/message.h>
+
+/**
+ * Dummy-implementation of mbus::Message and mbus::Reply to be used when interacting with
+ * MessageBus IThrottlePolicy subclasses, as these expect message instances as parameters.
+ */
+
+namespace storage {
+
+template <typename Base>
+class DummyMbusMessage : public Base {
+ static const mbus::string NAME;
+public:
+ const mbus::string& getProtocol() const override { return NAME; }
+ uint32_t getType() const override { return 0x1badb007; }
+ uint8_t priority() const override { return 255; }
+};
+
+template <typename Base>
+const mbus::string DummyMbusMessage<Base>::NAME = "FooBar";
+
+class DummyMbusRequest final : public DummyMbusMessage<mbus::Message> {
+public:
+ // getApproxSize() returns 1 by default.
+ // Approximate size of messages allowed by throttle policy is implicitly added to
+ // internal StaticThrottlePolicy pending size tracking and associated with the
+ // internal mbus context of the message.
+ // Since we have no connection between the request and reply instances used when
+ // interacting with the policy, we have to make sure they cancel each other out
+ // (i.e. += 0, -= 0).
+ // Not doing this would cause the StaticThrottlePolicy to keep adding a single byte
+ // of pending size for each message allowed by the policy.
+ uint32_t getApproxSize() const override { return 0; }
+};
+
+class DummyMbusReply final : public DummyMbusMessage<mbus::Reply> {};
+
+}
diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp
index 4c0fadc74b1..7b05decb851 100644
--- a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp
+++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp
@@ -1,7 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "shared_operation_throttler.h"
#include <vespa/messagebus/dynamicthrottlepolicy.h>
-#include <vespa/messagebus/message.h>
+#include <vespa/storage/common/dummy_mbus_messages.h>
#include <condition_variable>
#include <cassert>
#include <mutex>
@@ -28,19 +28,6 @@ private:
void release_one() noexcept override { /* no-op */ }
};
-// Class used to sneakily get around IThrottlePolicy only accepting MBus objects
-template <typename Base>
-class DummyMbusMessage final : public Base {
- static const mbus::string NAME;
-public:
- const mbus::string& getProtocol() const override { return NAME; }
- uint32_t getType() const override { return 0x1badb007; }
- uint8_t priority() const override { return 255; }
-};
-
-template <typename Base>
-const mbus::string DummyMbusMessage<Base>::NAME = "FooBar";
-
class DynamicOperationThrottler final : public SharedOperationThrottler {
mutable std::mutex _mutex;
std::condition_variable _cond;
@@ -79,22 +66,22 @@ DynamicOperationThrottler::~DynamicOperationThrottler() = default;
bool
DynamicOperationThrottler::has_spare_capacity_in_active_window() noexcept
{
- DummyMbusMessage<mbus::Message> dummy_msg;
- return _throttle_policy.canSend(dummy_msg, _pending_ops);
+ DummyMbusRequest dummy_request;
+ return _throttle_policy.canSend(dummy_request, _pending_ops);
}
void
DynamicOperationThrottler::add_one_to_active_window_size()
{
- DummyMbusMessage<mbus::Message> dummy_msg;
- _throttle_policy.processMessage(dummy_msg);
+ DummyMbusRequest dummy_request;
+ _throttle_policy.processMessage(dummy_request);
++_pending_ops;
}
void
DynamicOperationThrottler::subtract_one_from_active_window_size()
{
- DummyMbusMessage<mbus::Reply> dummy_reply;
+ DummyMbusReply dummy_reply;
_throttle_policy.processReply(dummy_reply);
assert(_pending_ops > 0);
--_pending_ops;
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index bc2f54e5a50..2a30acb1a74 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -2,6 +2,7 @@
#include "mergethrottler.h"
#include <vespa/storage/common/nodestateupdater.h>
+#include <vespa/storage/common/dummy_mbus_messages.h>
#include <vespa/storage/persistence/messages.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/messagebus/message.h>
@@ -27,22 +28,6 @@ struct NodeComparator {
}
};
-// Class used to sneakily get around IThrottlePolicy only accepting
-// messagebus objects
-template <typename Base>
-class DummyMbusMessage : public Base {
-private:
- static const mbus::string NAME;
-public:
- const mbus::string& getProtocol() const override { return NAME; }
- uint32_t getType() const override { return 0x1badb007; }
-
- uint8_t priority() const override { return 255; }
-};
-
-template <typename Base>
-const mbus::string DummyMbusMessage<Base>::NAME = "SkyNet";
-
}
MergeThrottler::ChainedMergeState::ChainedMergeState()
@@ -310,7 +295,7 @@ MergeThrottler::onFlush(bool /*downwards*/)
"own the command", merge.first.toString().c_str());
}
- DummyMbusMessage<mbus::Reply> dummyReply;
+ DummyMbusReply dummyReply;
_throttlePolicy->processReply(dummyReply);
}
for (auto& entry : _queue) {
@@ -419,7 +404,7 @@ MergeThrottler::enqueue_merge_for_later_processing(
bool
MergeThrottler::canProcessNewMerge() const
{
- DummyMbusMessage<mbus::Message> dummyMsg;
+ DummyMbusRequest dummyMsg;
return _throttlePolicy->canSend(dummyMsg, _merges.size());
}
@@ -858,7 +843,7 @@ MergeThrottler::processNewMergeCommand(
LOG(debug, "Added merge %s to internal state",
mergeCmd.toString().c_str());
- DummyMbusMessage<mbus::Message> dummyMsg;
+ DummyMbusRequest dummyMsg;
_throttlePolicy->processMessage(dummyMsg);
bool execute = false;
@@ -1058,7 +1043,7 @@ MergeThrottler::processMergeReply(
updateOperationMetrics(mergeReply.getResult(), _metrics->local);
}
- DummyMbusMessage<mbus::Reply> dummyReply;
+ DummyMbusReply dummyReply;
if (mergeReply.getResult().failed()) {
// Must be sure to add an error if reply contained a failure, since
// DynamicThrottlePolicy penalizes on failed transmissions