diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-01-11 12:58:18 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-01-11 12:58:18 +0000 |
commit | 941b4d0a9d5729145f73d1f885b0ffbb9687f15e (patch) | |
tree | 6f14c1f3587320400833f98ffa1f3986c8721638 /storage | |
parent | 08708ba6add14d390c12b01805f4addc3df0507d (diff) |
Move dummy mbus messages to shared header and use in MergeThrottler as well
Explicitly override `mbus::Message::getApproxSize()` to return 0 (instead
of the default 1) to avoid mis-counting size between requests and responses.
This has not mattered in practice since we haven't set any size-based
limits (only message count limits), but we should ensure symmetry anyway.
Diffstat (limited to 'storage')
3 files changed, 50 insertions, 39 deletions
diff --git a/storage/src/vespa/storage/common/dummy_mbus_messages.h b/storage/src/vespa/storage/common/dummy_mbus_messages.h new file mode 100644 index 00000000000..1efaa6b654f --- /dev/null +++ b/storage/src/vespa/storage/common/dummy_mbus_messages.h @@ -0,0 +1,39 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/messagebus/message.h> + +/** + * Dummy-implementation of mbus::Message and mbus::Reply to be used when interacting with + * MessageBus IThrottlePolicy subclasses, as these expect message instances as parameters. + */ + +namespace storage { + +template <typename Base> +class DummyMbusMessage : public Base { + static const mbus::string NAME; +public: + const mbus::string& getProtocol() const override { return NAME; } + uint32_t getType() const override { return 0x1badb007; } + uint8_t priority() const override { return 255; } +}; + +template <typename Base> +const mbus::string DummyMbusMessage<Base>::NAME = "FooBar"; + +class DummyMbusRequest final : public DummyMbusMessage<mbus::Message> { +public: + // getApproxSize() returns 1 by default. + // Approximate size of messages allowed by throttle policy is implicitly added to + // internal StaticThrottlePolicy pending size tracking and associated with the + // internal mbus context of the message. + // Since we have no connection between the request and reply instances used when + // interacting with the policy, we have to make sure they cancel each other out + // (i.e. += 0, -= 0). + // Not doing this would cause the StaticThrottlePolicy to keep adding a single byte + // of pending size for each message allowed by the policy. + uint32_t getApproxSize() const override { return 0; } +}; + +class DummyMbusReply final : public DummyMbusMessage<mbus::Reply> {}; + +} diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp index 4c0fadc74b1..7b05decb851 100644 --- a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp +++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp @@ -1,7 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "shared_operation_throttler.h" #include <vespa/messagebus/dynamicthrottlepolicy.h> -#include <vespa/messagebus/message.h> +#include <vespa/storage/common/dummy_mbus_messages.h> #include <condition_variable> #include <cassert> #include <mutex> @@ -28,19 +28,6 @@ private: void release_one() noexcept override { /* no-op */ } }; -// Class used to sneakily get around IThrottlePolicy only accepting MBus objects -template <typename Base> -class DummyMbusMessage final : public Base { - static const mbus::string NAME; -public: - const mbus::string& getProtocol() const override { return NAME; } - uint32_t getType() const override { return 0x1badb007; } - uint8_t priority() const override { return 255; } -}; - -template <typename Base> -const mbus::string DummyMbusMessage<Base>::NAME = "FooBar"; - class DynamicOperationThrottler final : public SharedOperationThrottler { mutable std::mutex _mutex; std::condition_variable _cond; @@ -79,22 +66,22 @@ DynamicOperationThrottler::~DynamicOperationThrottler() = default; bool DynamicOperationThrottler::has_spare_capacity_in_active_window() noexcept { - DummyMbusMessage<mbus::Message> dummy_msg; - return _throttle_policy.canSend(dummy_msg, _pending_ops); + DummyMbusRequest dummy_request; + return _throttle_policy.canSend(dummy_request, _pending_ops); } void DynamicOperationThrottler::add_one_to_active_window_size() { - DummyMbusMessage<mbus::Message> dummy_msg; - _throttle_policy.processMessage(dummy_msg); + DummyMbusRequest dummy_request; + _throttle_policy.processMessage(dummy_request); ++_pending_ops; } void DynamicOperationThrottler::subtract_one_from_active_window_size() { - DummyMbusMessage<mbus::Reply> dummy_reply; + DummyMbusReply dummy_reply; _throttle_policy.processReply(dummy_reply); assert(_pending_ops > 0); --_pending_ops; diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index bc2f54e5a50..2a30acb1a74 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -2,6 +2,7 @@ #include "mergethrottler.h" #include <vespa/storage/common/nodestateupdater.h> +#include <vespa/storage/common/dummy_mbus_messages.h> #include <vespa/storage/persistence/messages.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/messagebus/message.h> @@ -27,22 +28,6 @@ struct NodeComparator { } }; -// Class used to sneakily get around IThrottlePolicy only accepting -// messagebus objects -template <typename Base> -class DummyMbusMessage : public Base { -private: - static const mbus::string NAME; -public: - const mbus::string& getProtocol() const override { return NAME; } - uint32_t getType() const override { return 0x1badb007; } - - uint8_t priority() const override { return 255; } -}; - -template <typename Base> -const mbus::string DummyMbusMessage<Base>::NAME = "SkyNet"; - } MergeThrottler::ChainedMergeState::ChainedMergeState() @@ -310,7 +295,7 @@ MergeThrottler::onFlush(bool /*downwards*/) "own the command", merge.first.toString().c_str()); } - DummyMbusMessage<mbus::Reply> dummyReply; + DummyMbusReply dummyReply; _throttlePolicy->processReply(dummyReply); } for (auto& entry : _queue) { @@ -419,7 +404,7 @@ MergeThrottler::enqueue_merge_for_later_processing( bool MergeThrottler::canProcessNewMerge() const { - DummyMbusMessage<mbus::Message> dummyMsg; + DummyMbusRequest dummyMsg; return _throttlePolicy->canSend(dummyMsg, _merges.size()); } @@ -858,7 +843,7 @@ MergeThrottler::processNewMergeCommand( LOG(debug, "Added merge %s to internal state", mergeCmd.toString().c_str()); - DummyMbusMessage<mbus::Message> dummyMsg; + DummyMbusRequest dummyMsg; _throttlePolicy->processMessage(dummyMsg); bool execute = false; @@ -1058,7 +1043,7 @@ MergeThrottler::processMergeReply( updateOperationMetrics(mergeReply.getResult(), _metrics->local); } - DummyMbusMessage<mbus::Reply> dummyReply; + DummyMbusReply dummyReply; if (mergeReply.getResult().failed()) { // Must be sure to add an error if reply contained a failure, since // DynamicThrottlePolicy penalizes on failed transmissions |