diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-12-21 17:05:35 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-12-21 17:05:35 +0100 |
commit | e7efc2c90f128bc801ae29abe071c0871a277726 (patch) | |
tree | 00dadcd7e6023c82979d3bbe05b70ee8da52f242 | |
parent | a3d5e2ce14352ab181e940574f6e1a99d40cd520 (diff) | |
parent | 6d01030b16a77b86a9ca59029dced69bbacb1c83 (diff) |
Merge pull request #25320 from vespa-engine/balder/use-transport-sync
Balder/use transport sync
-rw-r--r-- | fnet/src/vespa/fnet/transport.h | 8 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.cpp | 4 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpcnetwork.cpp | 35 |
3 files changed, 9 insertions, 38 deletions
diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h index d6e4aefb02b..3f9328296d6 100644 --- a/fnet/src/vespa/fnet/transport.h +++ b/fnet/src/vespa/fnet/transport.h @@ -89,7 +89,7 @@ private: FNET_Config _config; vespalib::AsyncResolver::SP _resolver; vespalib::CryptoEngine::SP _crypto; - fnet::TimeTools::SP _time_tools; + fnet::TimeTools::SP _time_tools; uint32_t _num_threads; }; @@ -108,10 +108,10 @@ private: vespalib::AsyncResolver::SP _async_resolver; vespalib::CryptoEngine::SP _crypto_engine; - fnet::TimeTools::SP _time_tools; + fnet::TimeTools::SP _time_tools; std::unique_ptr<vespalib::SyncableThreadExecutor> _work_pool; - Threads _threads; - const FNET_Config _config; + Threads _threads; + const FNET_Config _config; /** * Wait for all pending resolve requests. diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index d46d174c670..590b6acbfff 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -127,7 +127,9 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket, _queue.QueuePacket_NoLock(cpacket, context); qLen = _queue.GetPacketCnt_NoLock(); } - if (qLen == getConfig()._events_before_wakeup) { + if ((qLen == getConfig()._events_before_wakeup) || + (cpacket->GetCommand() == FNET_ControlPacket::FNET_CMD_EXECUTE)) + { _selector.wakeup(); } return true; diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index bc3ba205b69..030e3f956e1 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -18,7 +18,6 @@ #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/stringfmt.h> -#include <vespa/vespalib/util/gate.h> #include <vespa/fastos/thread.h> #include <thread> @@ -32,36 +31,6 @@ namespace mbus { namespace { -/** - * Implements a helper class for {@link RPCNetwork#sync()}. It provides a - * blocking method {@link #await()} that will wait until the internal state - * of this object is set to 'done'. By scheduling this task in the network - * thread and then calling this method, we achieve handshaking with the network - * thread. - */ -class SyncTask : public FNET_Task { -private: - vespalib::Gate _gate; - -public: - SyncTask(FNET_Scheduler &s) : - FNET_Task(&s), - _gate() { - ScheduleNow(); - } - ~SyncTask() override { - Kill(); - } - - void await() { - _gate.await(); - } - - void PerformTask() override { - _gate.countDown(); - } -}; - struct TargetPoolTask : public FNET_Task { RPCTargetPool &_pool; @@ -412,8 +381,8 @@ RPCNetwork::send(RPCNetwork::SendContext &ctx) void RPCNetwork::sync() { - SyncTask task(_scheduler); - task.await(); + _transport->sync(); // Ensure transport loop run at least once to execute task scheduled for NOW + _transport->sync(); // And then once more to ensure they are all done. } void |