aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2023-10-04 09:48:49 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2023-10-04 09:48:49 +0000
commit124de1533c52c740e1f3b2d0dd48285e47a049a5 (patch)
tree3272791f8695b0b4eb4cc9bd6210c8e0ab6a9991
parent00e5ee428268121c3a3fa6d9f740a451e57dc762 (diff)
Remove unused message dispatcher functionality
Only the reply dispatcher functionality is ever used. Also rename shutdown function to raise less eyebrows from a case of mistaken identity with `std::terminate`...
-rw-r--r--storage/src/vespa/storage/common/storagelinkqueued.cpp18
-rw-r--r--storage/src/vespa/storage/common/storagelinkqueued.h69
-rw-r--r--storage/src/vespa/storage/common/storagelinkqueued.hpp14
3 files changed, 24 insertions, 77 deletions
diff --git a/storage/src/vespa/storage/common/storagelinkqueued.cpp b/storage/src/vespa/storage/common/storagelinkqueued.cpp
index 2f116738c28..7f0caaae484 100644
--- a/storage/src/vespa/storage/common/storagelinkqueued.cpp
+++ b/storage/src/vespa/storage/common/storagelinkqueued.cpp
@@ -10,7 +10,6 @@ StorageLinkQueued::StorageLinkQueued(const std::string& name, framework::Compone
: StorageLink(name),
_compReg(cr),
_replyDispatcher(*this),
- _commandDispatcher(*this),
_closeState(0)
{ }
@@ -25,23 +24,6 @@ StorageLinkQueued::~StorageLinkQueued()
}
}
-void StorageLinkQueued::dispatchDown(
- const std::shared_ptr<api::StorageMessage>& msg)
-{
- // Verify acceptable state to send messages down
- switch(getState()) {
- case OPENED:
- case CLOSING:
- case FLUSHINGDOWN:
- break;
- default:
- LOG(error, "Link %s trying to dispatch %s down while in state %u",
- toString().c_str(), msg->toString().c_str(), getState());
- assert(false);
- }
- _commandDispatcher.add(msg);
-}
-
void StorageLinkQueued::dispatchUp(
const std::shared_ptr<api::StorageMessage>& msg)
{
diff --git a/storage/src/vespa/storage/common/storagelinkqueued.h b/storage/src/vespa/storage/common/storagelinkqueued.h
index 17a344a368a..24facbacb5e 100644
--- a/storage/src/vespa/storage/common/storagelinkqueued.h
+++ b/storage/src/vespa/storage/common/storagelinkqueued.h
@@ -1,25 +1,18 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
- * @class storage::StorageLinkQueued
- * @ingroup common
- *
- * @brief Storage link with a message queue.
- *
- * Storage link implementing separate threads for dispatching messages.
+ * Storage link implementing a separate thread for dispatching replies.
* Using this class you can use dispatchReply instead of sendReply to have the
* replies sent through another thread.
- *
- * @version $Id$
*/
#pragma once
#include "storagelink.h"
#include <vespa/storageframework/generic/thread/runnable.h>
+#include <condition_variable>
#include <deque>
#include <limits>
#include <mutex>
-#include <condition_variable>
namespace storage {
@@ -32,13 +25,7 @@ namespace framework {
class StorageLinkQueued : public StorageLink {
public:
StorageLinkQueued(const std::string& name, framework::ComponentRegister& cr);
- virtual ~StorageLinkQueued();
-
- /**
- * Add message to internal queue, to be dispatched downstream
- * in separate thread.
- */
- void dispatchDown(const std::shared_ptr<api::StorageMessage>&);
+ ~StorageLinkQueued() override;
/**
* Add message to internal queue, to be dispatched downstream
@@ -48,14 +35,12 @@ public:
/** Remember to call this method if you override it. */
void onClose() override {
- _commandDispatcher.flush();
_closeState |= 1;
}
/** Remember to call this method if you override it. */
void onFlush(bool downwards) override {
if (downwards) {
- _commandDispatcher.flush();
_closeState |= 2;
} else {
_replyDispatcher.flush();
@@ -69,25 +54,25 @@ public:
framework::ComponentRegister& getComponentRegister() { return _compReg; }
private:
- /** Common class to prevent need for duplicate code. */
template<typename Message>
class Dispatcher : public framework::Runnable
{
protected:
- StorageLinkQueued& _parent;
- unsigned int _maxQueueSize;
- std::mutex _sync;
- std::condition_variable _syncCond;
- std::deque< std::shared_ptr<Message> > _messages;
- bool _replyDispatcher;
+ StorageLinkQueued& _parent;
+ unsigned int _maxQueueSize;
+ std::mutex _sync;
+ std::condition_variable _syncCond;
+ std::deque<std::shared_ptr<Message>> _messages;
+ bool _replyDispatcher;
std::unique_ptr<framework::Component> _component;
- std::unique_ptr<framework::Thread> _thread;
- void terminate();
+ std::unique_ptr<framework::Thread> _thread;
+
+ void shutdown();
public:
Dispatcher(StorageLinkQueued& parent, unsigned int maxQueueSize, bool replyDispatcher);
- ~Dispatcher();
+ ~Dispatcher() override;
void start();
void run(framework::ThreadHandle&) override;
@@ -98,10 +83,9 @@ private:
virtual void send(const std::shared_ptr<Message> & ) = 0;
};
- class ReplyDispatcher : public Dispatcher<api::StorageMessage>
- {
+ class ReplyDispatcher : public Dispatcher<api::StorageMessage> {
public:
- ReplyDispatcher(StorageLinkQueued& parent)
+ explicit ReplyDispatcher(StorageLinkQueued& parent)
: Dispatcher<api::StorageMessage>(
parent, std::numeric_limits<unsigned int>::max(), true)
{
@@ -109,30 +93,11 @@ private:
void send(const std::shared_ptr<api::StorageMessage> & reply) override {
_parent.sendUp(reply);
}
- ~ReplyDispatcher() { terminate(); }
- };
-
- class CommandDispatcher : public Dispatcher<api::StorageMessage>
- {
- public:
- CommandDispatcher(StorageLinkQueued& parent)
- : Dispatcher<api::StorageMessage>(
- parent, std::numeric_limits<unsigned int>::max(), false)
- {
- }
- ~CommandDispatcher() { terminate(); }
- void send(const std::shared_ptr<api::StorageMessage> & command) override {
- _parent.sendDown(command);
- }
};
framework::ComponentRegister& _compReg;
- ReplyDispatcher _replyDispatcher;
- CommandDispatcher _commandDispatcher;
- uint16_t _closeState;
-
-protected:
- ReplyDispatcher& getReplyDispatcher() { return _replyDispatcher; }
+ ReplyDispatcher _replyDispatcher;
+ uint16_t _closeState;
};
}
diff --git a/storage/src/vespa/storage/common/storagelinkqueued.hpp b/storage/src/vespa/storage/common/storagelinkqueued.hpp
index 01b6ae4a370..e78a68d2b8d 100644
--- a/storage/src/vespa/storage/common/storagelinkqueued.hpp
+++ b/storage/src/vespa/storage/common/storagelinkqueued.hpp
@@ -14,11 +14,11 @@ namespace storage {
template<typename Message>
void
-StorageLinkQueued::Dispatcher<Message>::terminate() {
+StorageLinkQueued::Dispatcher<Message>::shutdown() {
if (_thread) {
_thread->interrupt();
{
- std::lock_guard<std::mutex> guard(_sync);
+ std::lock_guard guard(_sync);
_syncCond.notify_one();
}
_thread->join();
@@ -43,7 +43,7 @@ StorageLinkQueued::Dispatcher<Message>::Dispatcher(StorageLinkQueued& parent, un
template<typename Message>
StorageLinkQueued::Dispatcher<Message>::~Dispatcher() {
- terminate();
+ shutdown();
}
template<typename Message>
@@ -56,7 +56,7 @@ void StorageLinkQueued::Dispatcher<Message>::start()
template<typename Message>
void StorageLinkQueued::Dispatcher<Message>::add(const std::shared_ptr<Message>& m)
{
- std::unique_lock<std::mutex> guard(_sync);
+ std::unique_lock guard(_sync);
if ( ! _thread) start();
while ((_messages.size() > _maxQueueSize) && !_thread->interrupted()) {
@@ -73,7 +73,7 @@ void StorageLinkQueued::Dispatcher<Message>::run(framework::ThreadHandle& h)
h.registerTick(framework::PROCESS_CYCLE);
std::shared_ptr<Message> message;
{
- std::unique_lock<std::mutex> guard(_sync);
+ std::unique_lock guard(_sync);
while (!h.interrupted() && _messages.empty()) {
_syncCond.wait_for(guard, 100ms);
h.registerTick(framework::WAIT_CYCLE);
@@ -94,7 +94,7 @@ void StorageLinkQueued::Dispatcher<Message>::run(framework::ThreadHandle& h)
{
// Since flush() only waits for stack to be empty, we must
// pop stack AFTER send have been called.
- std::lock_guard<std::mutex> guard(_sync);
+ std::lock_guard guard(_sync);
_messages.pop_front();
_syncCond.notify_one();
}
@@ -106,7 +106,7 @@ template<typename Message>
void StorageLinkQueued::Dispatcher<Message>::flush()
{
using namespace std::chrono_literals;
- std::unique_lock<std::mutex> guard(_sync);
+ std::unique_lock guard(_sync);
while (!_messages.empty()) {
_syncCond.wait_for(guard, 100ms);
}