diff options
Diffstat (limited to 'storage/src/vespa/storage/common/storagelinkqueued.h')
-rw-r--r-- | storage/src/vespa/storage/common/storagelinkqueued.h | 71 |
1 files changed, 18 insertions, 53 deletions
diff --git a/storage/src/vespa/storage/common/storagelinkqueued.h b/storage/src/vespa/storage/common/storagelinkqueued.h index 17a344a368a..3f7a831d9fe 100644 --- a/storage/src/vespa/storage/common/storagelinkqueued.h +++ b/storage/src/vespa/storage/common/storagelinkqueued.h @@ -1,25 +1,18 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * @class storage::StorageLinkQueued - * @ingroup common - * - * @brief Storage link with a message queue. - * - * Storage link implementing separate threads for dispatching messages. + * Storage link implementing a separate thread for dispatching replies. * Using this class you can use dispatchReply instead of sendReply to have the * replies sent through another thread. - * - * @version $Id$ */ #pragma once #include "storagelink.h" #include <vespa/storageframework/generic/thread/runnable.h> +#include <condition_variable> #include <deque> #include <limits> #include <mutex> -#include <condition_variable> namespace storage { @@ -32,13 +25,7 @@ namespace framework { class StorageLinkQueued : public StorageLink { public: StorageLinkQueued(const std::string& name, framework::ComponentRegister& cr); - virtual ~StorageLinkQueued(); - - /** - * Add message to internal queue, to be dispatched downstream - * in separate thread. - */ - void dispatchDown(const std::shared_ptr<api::StorageMessage>&); + ~StorageLinkQueued() override; /** * Add message to internal queue, to be dispatched downstream @@ -48,14 +35,12 @@ public: /** Remember to call this method if you override it. */ void onClose() override { - _commandDispatcher.flush(); _closeState |= 1; } /** Remember to call this method if you override it. */ void onFlush(bool downwards) override { if (downwards) { - _commandDispatcher.flush(); _closeState |= 2; } else { _replyDispatcher.flush(); @@ -69,25 +54,25 @@ public: framework::ComponentRegister& getComponentRegister() { return _compReg; } private: - /** Common class to prevent need for duplicate code. */ template<typename Message> class Dispatcher : public framework::Runnable { protected: - StorageLinkQueued& _parent; - unsigned int _maxQueueSize; - std::mutex _sync; - std::condition_variable _syncCond; - std::deque< std::shared_ptr<Message> > _messages; - bool _replyDispatcher; + StorageLinkQueued& _parent; + unsigned int _maxQueueSize; + std::mutex _sync; + std::condition_variable _syncCond; + std::deque<std::shared_ptr<Message>> _messages; + bool _replyDispatcher; std::unique_ptr<framework::Component> _component; - std::unique_ptr<framework::Thread> _thread; - void terminate(); + std::unique_ptr<framework::Thread> _thread; + + void shutdown(); public: Dispatcher(StorageLinkQueued& parent, unsigned int maxQueueSize, bool replyDispatcher); - ~Dispatcher(); + ~Dispatcher() override; void start(); void run(framework::ThreadHandle&) override; @@ -98,10 +83,9 @@ private: virtual void send(const std::shared_ptr<Message> & ) = 0; }; - class ReplyDispatcher : public Dispatcher<api::StorageMessage> - { + class ReplyDispatcher : public Dispatcher<api::StorageMessage> { public: - ReplyDispatcher(StorageLinkQueued& parent) + explicit ReplyDispatcher(StorageLinkQueued& parent) : Dispatcher<api::StorageMessage>( parent, std::numeric_limits<unsigned int>::max(), true) { @@ -109,30 +93,11 @@ private: void send(const std::shared_ptr<api::StorageMessage> & reply) override { _parent.sendUp(reply); } - ~ReplyDispatcher() { terminate(); } - }; - - class CommandDispatcher : public Dispatcher<api::StorageMessage> - { - public: - CommandDispatcher(StorageLinkQueued& parent) - : Dispatcher<api::StorageMessage>( - parent, std::numeric_limits<unsigned int>::max(), false) - { - } - ~CommandDispatcher() { terminate(); } - void send(const std::shared_ptr<api::StorageMessage> & command) override { - _parent.sendDown(command); - } }; framework::ComponentRegister& _compReg; - ReplyDispatcher _replyDispatcher; - CommandDispatcher _commandDispatcher; - uint16_t _closeState; - -protected: - ReplyDispatcher& getReplyDispatcher() { return _replyDispatcher; } + ReplyDispatcher _replyDispatcher; + uint16_t _closeState; }; } |