diff options
-rw-r--r-- | fnet/src/vespa/fnet/transport.h | 8 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.cpp | 4 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpcnetwork.cpp | 35 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/invokeserviceimpl.cpp | 23 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/invokeserviceimpl.h | 2 |
5 files changed, 21 insertions, 51 deletions
diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h index d6e4aefb02b..3f9328296d6 100644 --- a/fnet/src/vespa/fnet/transport.h +++ b/fnet/src/vespa/fnet/transport.h @@ -89,7 +89,7 @@ private: FNET_Config _config; vespalib::AsyncResolver::SP _resolver; vespalib::CryptoEngine::SP _crypto; - fnet::TimeTools::SP _time_tools; + fnet::TimeTools::SP _time_tools; uint32_t _num_threads; }; @@ -108,10 +108,10 @@ private: vespalib::AsyncResolver::SP _async_resolver; vespalib::CryptoEngine::SP _crypto_engine; - fnet::TimeTools::SP _time_tools; + fnet::TimeTools::SP _time_tools; std::unique_ptr<vespalib::SyncableThreadExecutor> _work_pool; - Threads _threads; - const FNET_Config _config; + Threads _threads; + const FNET_Config _config; /** * Wait for all pending resolve requests. diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index d46d174c670..590b6acbfff 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -127,7 +127,9 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket, _queue.QueuePacket_NoLock(cpacket, context); qLen = _queue.GetPacketCnt_NoLock(); } - if (qLen == getConfig()._events_before_wakeup) { + if ((qLen == getConfig()._events_before_wakeup) || + (cpacket->GetCommand() == FNET_ControlPacket::FNET_CMD_EXECUTE)) + { _selector.wakeup(); } return true; diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index bc3ba205b69..030e3f956e1 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -18,7 +18,6 @@ #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/stringfmt.h> -#include <vespa/vespalib/util/gate.h> #include <vespa/fastos/thread.h> #include <thread> @@ -32,36 +31,6 @@ namespace mbus { namespace { -/** - * Implements a helper class for {@link RPCNetwork#sync()}. It provides a - * blocking method {@link #await()} that will wait until the internal state - * of this object is set to 'done'. By scheduling this task in the network - * thread and then calling this method, we achieve handshaking with the network - * thread. - */ -class SyncTask : public FNET_Task { -private: - vespalib::Gate _gate; - -public: - SyncTask(FNET_Scheduler &s) : - FNET_Task(&s), - _gate() { - ScheduleNow(); - } - ~SyncTask() override { - Kill(); - } - - void await() { - _gate.await(); - } - - void PerformTask() override { - _gate.countDown(); - } -}; - struct TargetPoolTask : public FNET_Task { RPCTargetPool &_pool; @@ -412,8 +381,8 @@ RPCNetwork::send(RPCNetwork::SendContext &ctx) void RPCNetwork::sync() { - SyncTask task(_scheduler); - task.await(); + _transport->sync(); // Ensure transport loop run at least once to execute task scheduled for NOW + _transport->sync(); // And then once more to ensure they are all done. } void diff --git a/vespalib/src/vespa/vespalib/util/invokeserviceimpl.cpp b/vespalib/src/vespa/vespalib/util/invokeserviceimpl.cpp index c10029b2f58..d4a4164fbf4 100644 --- a/vespalib/src/vespa/vespalib/util/invokeserviceimpl.cpp +++ b/vespalib/src/vespa/vespalib/util/invokeserviceimpl.cpp @@ -9,6 +9,7 @@ InvokeServiceImpl::InvokeServiceImpl(duration napTime) : _naptime(napTime), _now(steady_clock::now()), _lock(), + _cond(), _currId(0), _closed(false), _toInvoke(), @@ -22,6 +23,7 @@ InvokeServiceImpl::~InvokeServiceImpl() std::lock_guard guard(_lock); assert(_toInvoke.empty()); _closed = true; + _cond.notify_all(); } _thread->join(); } @@ -39,7 +41,7 @@ public: } private: InvokeServiceImpl * _service; - uint64_t _id; + uint64_t _id; }; std::unique_ptr<IDestructorCallback> @@ -47,6 +49,7 @@ InvokeServiceImpl::registerInvoke(InvokeFunc func) { std::lock_guard guard(_lock); uint64_t id = _currId++; _toInvoke.emplace_back(id, std::move(func)); + _cond.notify_all(); return std::make_unique<Registration>(this, id); } @@ -58,25 +61,19 @@ InvokeServiceImpl::unregister(uint64_t id) { }); assert (found != _toInvoke.end()); _toInvoke.erase(found); + _cond.notify_all(); } void InvokeServiceImpl::runLoop() { - bool done = false; - while ( ! done ) { + std::unique_lock guard(_lock); + while ( ! _closed ) { _now.store(steady_clock::now(), std::memory_order_relaxed); - { - std::lock_guard guard(_lock); - for (auto & func: _toInvoke) { - func.second(); - } - done = _closed; - } - if ( ! done) { - std::this_thread::sleep_for(_naptime); + for (auto & func: _toInvoke) { + func.second(); } + _cond.wait_for(guard, _naptime); } - } } diff --git a/vespalib/src/vespa/vespalib/util/invokeserviceimpl.h b/vespalib/src/vespa/vespalib/util/invokeserviceimpl.h index beb57ca1ce0..ef0755e7614 100644 --- a/vespalib/src/vespa/vespalib/util/invokeserviceimpl.h +++ b/vespalib/src/vespa/vespalib/util/invokeserviceimpl.h @@ -7,6 +7,7 @@ #include <mutex> #include <vector> #include <thread> +#include <condition_variable> namespace vespalib { @@ -29,6 +30,7 @@ private: duration _naptime; std::atomic<steady_time> _now; std::mutex _lock; + std::condition_variable _cond; uint64_t _currId; bool _closed; std::vector<IdAndFunc> _toInvoke; |