// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once #include "storagelinkqueued.h" #include #include #include #include #include #include namespace storage { template void StorageLinkQueued::Dispatcher::shutdown() { if (_thread) { _thread->interrupt(); { std::lock_guard guard(_sync); _syncCond.notify_one(); } _thread->join(); _thread.reset(); } } template StorageLinkQueued::Dispatcher::Dispatcher(StorageLinkQueued& parent, unsigned int maxQueueSize, bool replyDispatcher) : _parent(parent), _maxQueueSize(maxQueueSize), _sync(), _syncCond(), _messages(), _replyDispatcher(replyDispatcher) { std::ostringstream name; name << "Queued storage " << (_replyDispatcher ? "up" : "down") << "link - " << _parent.getName(); _component = std::make_unique(parent.getComponentRegister(), name.str()); } template StorageLinkQueued::Dispatcher::~Dispatcher() { shutdown(); } template void StorageLinkQueued::Dispatcher::start() { assert( ! _thread); _thread = _component->startThread(*this, 5s, 100ms); } template void StorageLinkQueued::Dispatcher::add(const std::shared_ptr& m) { std::unique_lock guard(_sync); if ( ! _thread) start(); while ((_messages.size() > _maxQueueSize) && !_thread->interrupted()) { _syncCond.wait_for(guard, 100ms); } _messages.push_back(m); _syncCond.notify_one(); } template void StorageLinkQueued::Dispatcher::run(framework::ThreadHandle& h) { while (!h.interrupted()) { h.registerTick(framework::PROCESS_CYCLE); std::shared_ptr message; { std::unique_lock guard(_sync); while (!h.interrupted() && _messages.empty()) { _syncCond.wait_for(guard, 100ms); h.registerTick(framework::WAIT_CYCLE); } if (h.interrupted()) break; message.swap(_messages.front()); } try { send(message); } catch (std::exception& e) { _parent.logError(vespalib::make_string( "When running command %s, caught exception %s. " "Discarding message", message->toString().c_str(), e.what()).c_str()); } { // Since flush() only waits for stack to be empty, we must // pop stack AFTER send have been called. std::lock_guard guard(_sync); _messages.pop_front(); _syncCond.notify_one(); } } _parent.logDebug("Finished storage link queued thread"); } template void StorageLinkQueued::Dispatcher::flush() { using namespace std::chrono_literals; std::unique_lock guard(_sync); while (!_messages.empty()) { _syncCond.wait_for(guard, 100ms); } } }