aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/common/storagelinkqueued.h
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/vespa/storage/common/storagelinkqueued.h')
-rw-r--r--storage/src/vespa/storage/common/storagelinkqueued.h71
1 files changed, 18 insertions, 53 deletions
diff --git a/storage/src/vespa/storage/common/storagelinkqueued.h b/storage/src/vespa/storage/common/storagelinkqueued.h
index 17a344a368a..3f7a831d9fe 100644
--- a/storage/src/vespa/storage/common/storagelinkqueued.h
+++ b/storage/src/vespa/storage/common/storagelinkqueued.h
@@ -1,25 +1,18 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
- * @class storage::StorageLinkQueued
- * @ingroup common
- *
- * @brief Storage link with a message queue.
- *
- * Storage link implementing separate threads for dispatching messages.
+ * Storage link implementing a separate thread for dispatching replies.
* Using this class you can use dispatchReply instead of sendReply to have the
* replies sent through another thread.
- *
- * @version $Id$
*/
#pragma once
#include "storagelink.h"
#include <vespa/storageframework/generic/thread/runnable.h>
+#include <condition_variable>
#include <deque>
#include <limits>
#include <mutex>
-#include <condition_variable>
namespace storage {
@@ -32,13 +25,7 @@ namespace framework {
class StorageLinkQueued : public StorageLink {
public:
StorageLinkQueued(const std::string& name, framework::ComponentRegister& cr);
- virtual ~StorageLinkQueued();
-
- /**
- * Add message to internal queue, to be dispatched downstream
- * in separate thread.
- */
- void dispatchDown(const std::shared_ptr<api::StorageMessage>&);
+ ~StorageLinkQueued() override;
/**
* Add message to internal queue, to be dispatched downstream
@@ -48,14 +35,12 @@ public:
/** Remember to call this method if you override it. */
void onClose() override {
- _commandDispatcher.flush();
_closeState |= 1;
}
/** Remember to call this method if you override it. */
void onFlush(bool downwards) override {
if (downwards) {
- _commandDispatcher.flush();
_closeState |= 2;
} else {
_replyDispatcher.flush();
@@ -69,25 +54,25 @@ public:
framework::ComponentRegister& getComponentRegister() { return _compReg; }
private:
- /** Common class to prevent need for duplicate code. */
template<typename Message>
class Dispatcher : public framework::Runnable
{
protected:
- StorageLinkQueued& _parent;
- unsigned int _maxQueueSize;
- std::mutex _sync;
- std::condition_variable _syncCond;
- std::deque< std::shared_ptr<Message> > _messages;
- bool _replyDispatcher;
+ StorageLinkQueued& _parent;
+ unsigned int _maxQueueSize;
+ std::mutex _sync;
+ std::condition_variable _syncCond;
+ std::deque<std::shared_ptr<Message>> _messages;
+ bool _replyDispatcher;
std::unique_ptr<framework::Component> _component;
- std::unique_ptr<framework::Thread> _thread;
- void terminate();
+ std::unique_ptr<framework::Thread> _thread;
+
+ void shutdown();
public:
Dispatcher(StorageLinkQueued& parent, unsigned int maxQueueSize, bool replyDispatcher);
- ~Dispatcher();
+ ~Dispatcher() override;
void start();
void run(framework::ThreadHandle&) override;
@@ -98,10 +83,9 @@ private:
virtual void send(const std::shared_ptr<Message> & ) = 0;
};
- class ReplyDispatcher : public Dispatcher<api::StorageMessage>
- {
+ class ReplyDispatcher : public Dispatcher<api::StorageMessage> {
public:
- ReplyDispatcher(StorageLinkQueued& parent)
+ explicit ReplyDispatcher(StorageLinkQueued& parent)
: Dispatcher<api::StorageMessage>(
parent, std::numeric_limits<unsigned int>::max(), true)
{
@@ -109,30 +93,11 @@ private:
void send(const std::shared_ptr<api::StorageMessage> & reply) override {
_parent.sendUp(reply);
}
- ~ReplyDispatcher() { terminate(); }
- };
-
- class CommandDispatcher : public Dispatcher<api::StorageMessage>
- {
- public:
- CommandDispatcher(StorageLinkQueued& parent)
- : Dispatcher<api::StorageMessage>(
- parent, std::numeric_limits<unsigned int>::max(), false)
- {
- }
- ~CommandDispatcher() { terminate(); }
- void send(const std::shared_ptr<api::StorageMessage> & command) override {
- _parent.sendDown(command);
- }
};
framework::ComponentRegister& _compReg;
- ReplyDispatcher _replyDispatcher;
- CommandDispatcher _commandDispatcher;
- uint16_t _closeState;
-
-protected:
- ReplyDispatcher& getReplyDispatcher() { return _replyDispatcher; }
+ ReplyDispatcher _replyDispatcher;
+ uint16_t _closeState;
};
}