diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2023-10-04 09:48:49 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2023-10-04 09:48:49 +0000 |
commit | 124de1533c52c740e1f3b2d0dd48285e47a049a5 (patch) | |
tree | 3272791f8695b0b4eb4cc9bd6210c8e0ab6a9991 | |
parent | 00e5ee428268121c3a3fa6d9f740a451e57dc762 (diff) |
Remove unused message dispatcher functionality
Only the reply dispatcher functionality is ever used.
Also rename shutdown function to raise less eyebrows from a case
of mistaken identity with `std::terminate`...
-rw-r--r-- | storage/src/vespa/storage/common/storagelinkqueued.cpp | 18 | ||||
-rw-r--r-- | storage/src/vespa/storage/common/storagelinkqueued.h | 69 | ||||
-rw-r--r-- | storage/src/vespa/storage/common/storagelinkqueued.hpp | 14 |
3 files changed, 24 insertions, 77 deletions
diff --git a/storage/src/vespa/storage/common/storagelinkqueued.cpp b/storage/src/vespa/storage/common/storagelinkqueued.cpp index 2f116738c28..7f0caaae484 100644 --- a/storage/src/vespa/storage/common/storagelinkqueued.cpp +++ b/storage/src/vespa/storage/common/storagelinkqueued.cpp @@ -10,7 +10,6 @@ StorageLinkQueued::StorageLinkQueued(const std::string& name, framework::Compone : StorageLink(name), _compReg(cr), _replyDispatcher(*this), - _commandDispatcher(*this), _closeState(0) { } @@ -25,23 +24,6 @@ StorageLinkQueued::~StorageLinkQueued() } } -void StorageLinkQueued::dispatchDown( - const std::shared_ptr<api::StorageMessage>& msg) -{ - // Verify acceptable state to send messages down - switch(getState()) { - case OPENED: - case CLOSING: - case FLUSHINGDOWN: - break; - default: - LOG(error, "Link %s trying to dispatch %s down while in state %u", - toString().c_str(), msg->toString().c_str(), getState()); - assert(false); - } - _commandDispatcher.add(msg); -} - void StorageLinkQueued::dispatchUp( const std::shared_ptr<api::StorageMessage>& msg) { diff --git a/storage/src/vespa/storage/common/storagelinkqueued.h b/storage/src/vespa/storage/common/storagelinkqueued.h index 17a344a368a..24facbacb5e 100644 --- a/storage/src/vespa/storage/common/storagelinkqueued.h +++ b/storage/src/vespa/storage/common/storagelinkqueued.h @@ -1,25 +1,18 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * @class storage::StorageLinkQueued - * @ingroup common - * - * @brief Storage link with a message queue. - * - * Storage link implementing separate threads for dispatching messages. + * Storage link implementing a separate thread for dispatching replies. * Using this class you can use dispatchReply instead of sendReply to have the * replies sent through another thread. - * - * @version $Id$ */ #pragma once #include "storagelink.h" #include <vespa/storageframework/generic/thread/runnable.h> +#include <condition_variable> #include <deque> #include <limits> #include <mutex> -#include <condition_variable> namespace storage { @@ -32,13 +25,7 @@ namespace framework { class StorageLinkQueued : public StorageLink { public: StorageLinkQueued(const std::string& name, framework::ComponentRegister& cr); - virtual ~StorageLinkQueued(); - - /** - * Add message to internal queue, to be dispatched downstream - * in separate thread. - */ - void dispatchDown(const std::shared_ptr<api::StorageMessage>&); + ~StorageLinkQueued() override; /** * Add message to internal queue, to be dispatched downstream @@ -48,14 +35,12 @@ public: /** Remember to call this method if you override it. */ void onClose() override { - _commandDispatcher.flush(); _closeState |= 1; } /** Remember to call this method if you override it. */ void onFlush(bool downwards) override { if (downwards) { - _commandDispatcher.flush(); _closeState |= 2; } else { _replyDispatcher.flush(); @@ -69,25 +54,25 @@ public: framework::ComponentRegister& getComponentRegister() { return _compReg; } private: - /** Common class to prevent need for duplicate code. */ template<typename Message> class Dispatcher : public framework::Runnable { protected: - StorageLinkQueued& _parent; - unsigned int _maxQueueSize; - std::mutex _sync; - std::condition_variable _syncCond; - std::deque< std::shared_ptr<Message> > _messages; - bool _replyDispatcher; + StorageLinkQueued& _parent; + unsigned int _maxQueueSize; + std::mutex _sync; + std::condition_variable _syncCond; + std::deque<std::shared_ptr<Message>> _messages; + bool _replyDispatcher; std::unique_ptr<framework::Component> _component; - std::unique_ptr<framework::Thread> _thread; - void terminate(); + std::unique_ptr<framework::Thread> _thread; + + void shutdown(); public: Dispatcher(StorageLinkQueued& parent, unsigned int maxQueueSize, bool replyDispatcher); - ~Dispatcher(); + ~Dispatcher() override; void start(); void run(framework::ThreadHandle&) override; @@ -98,10 +83,9 @@ private: virtual void send(const std::shared_ptr<Message> & ) = 0; }; - class ReplyDispatcher : public Dispatcher<api::StorageMessage> - { + class ReplyDispatcher : public Dispatcher<api::StorageMessage> { public: - ReplyDispatcher(StorageLinkQueued& parent) + explicit ReplyDispatcher(StorageLinkQueued& parent) : Dispatcher<api::StorageMessage>( parent, std::numeric_limits<unsigned int>::max(), true) { @@ -109,30 +93,11 @@ private: void send(const std::shared_ptr<api::StorageMessage> & reply) override { _parent.sendUp(reply); } - ~ReplyDispatcher() { terminate(); } - }; - - class CommandDispatcher : public Dispatcher<api::StorageMessage> - { - public: - CommandDispatcher(StorageLinkQueued& parent) - : Dispatcher<api::StorageMessage>( - parent, std::numeric_limits<unsigned int>::max(), false) - { - } - ~CommandDispatcher() { terminate(); } - void send(const std::shared_ptr<api::StorageMessage> & command) override { - _parent.sendDown(command); - } }; framework::ComponentRegister& _compReg; - ReplyDispatcher _replyDispatcher; - CommandDispatcher _commandDispatcher; - uint16_t _closeState; - -protected: - ReplyDispatcher& getReplyDispatcher() { return _replyDispatcher; } + ReplyDispatcher _replyDispatcher; + uint16_t _closeState; }; } diff --git a/storage/src/vespa/storage/common/storagelinkqueued.hpp b/storage/src/vespa/storage/common/storagelinkqueued.hpp index 01b6ae4a370..e78a68d2b8d 100644 --- a/storage/src/vespa/storage/common/storagelinkqueued.hpp +++ b/storage/src/vespa/storage/common/storagelinkqueued.hpp @@ -14,11 +14,11 @@ namespace storage { template<typename Message> void -StorageLinkQueued::Dispatcher<Message>::terminate() { +StorageLinkQueued::Dispatcher<Message>::shutdown() { if (_thread) { _thread->interrupt(); { - std::lock_guard<std::mutex> guard(_sync); + std::lock_guard guard(_sync); _syncCond.notify_one(); } _thread->join(); @@ -43,7 +43,7 @@ StorageLinkQueued::Dispatcher<Message>::Dispatcher(StorageLinkQueued& parent, un template<typename Message> StorageLinkQueued::Dispatcher<Message>::~Dispatcher() { - terminate(); + shutdown(); } template<typename Message> @@ -56,7 +56,7 @@ void StorageLinkQueued::Dispatcher<Message>::start() template<typename Message> void StorageLinkQueued::Dispatcher<Message>::add(const std::shared_ptr<Message>& m) { - std::unique_lock<std::mutex> guard(_sync); + std::unique_lock guard(_sync); if ( ! _thread) start(); while ((_messages.size() > _maxQueueSize) && !_thread->interrupted()) { @@ -73,7 +73,7 @@ void StorageLinkQueued::Dispatcher<Message>::run(framework::ThreadHandle& h) h.registerTick(framework::PROCESS_CYCLE); std::shared_ptr<Message> message; { - std::unique_lock<std::mutex> guard(_sync); + std::unique_lock guard(_sync); while (!h.interrupted() && _messages.empty()) { _syncCond.wait_for(guard, 100ms); h.registerTick(framework::WAIT_CYCLE); @@ -94,7 +94,7 @@ void StorageLinkQueued::Dispatcher<Message>::run(framework::ThreadHandle& h) { // Since flush() only waits for stack to be empty, we must // pop stack AFTER send have been called. - std::lock_guard<std::mutex> guard(_sync); + std::lock_guard guard(_sync); _messages.pop_front(); _syncCond.notify_one(); } @@ -106,7 +106,7 @@ template<typename Message> void StorageLinkQueued::Dispatcher<Message>::flush() { using namespace std::chrono_literals; - std::unique_lock<std::mutex> guard(_sync); + std::unique_lock guard(_sync); while (!_messages.empty()) { _syncCond.wait_for(guard, 100ms); } |