summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-12-21 17:05:35 +0100
committerGitHub <noreply@github.com>2022-12-21 17:05:35 +0100
commite7efc2c90f128bc801ae29abe071c0871a277726 (patch)
tree00dadcd7e6023c82979d3bbe05b70ee8da52f242
parenta3d5e2ce14352ab181e940574f6e1a99d40cd520 (diff)
parent6d01030b16a77b86a9ca59029dced69bbacb1c83 (diff)
Merge pull request #25320 from vespa-engine/balder/use-transport-sync
Balder/use transport sync
-rw-r--r--fnet/src/vespa/fnet/transport.h8
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp35
3 files changed, 9 insertions, 38 deletions
diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h
index d6e4aefb02b..3f9328296d6 100644
--- a/fnet/src/vespa/fnet/transport.h
+++ b/fnet/src/vespa/fnet/transport.h
@@ -89,7 +89,7 @@ private:
FNET_Config _config;
vespalib::AsyncResolver::SP _resolver;
vespalib::CryptoEngine::SP _crypto;
- fnet::TimeTools::SP _time_tools;
+ fnet::TimeTools::SP _time_tools;
uint32_t _num_threads;
};
@@ -108,10 +108,10 @@ private:
vespalib::AsyncResolver::SP _async_resolver;
vespalib::CryptoEngine::SP _crypto_engine;
- fnet::TimeTools::SP _time_tools;
+ fnet::TimeTools::SP _time_tools;
std::unique_ptr<vespalib::SyncableThreadExecutor> _work_pool;
- Threads _threads;
- const FNET_Config _config;
+ Threads _threads;
+ const FNET_Config _config;
/**
* Wait for all pending resolve requests.
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index d46d174c670..590b6acbfff 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -127,7 +127,9 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket,
_queue.QueuePacket_NoLock(cpacket, context);
qLen = _queue.GetPacketCnt_NoLock();
}
- if (qLen == getConfig()._events_before_wakeup) {
+ if ((qLen == getConfig()._events_before_wakeup) ||
+ (cpacket->GetCommand() == FNET_ControlPacket::FNET_CMD_EXECUTE))
+ {
_selector.wakeup();
}
return true;
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index bc3ba205b69..030e3f956e1 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -18,7 +18,6 @@
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/vespalib/util/gate.h>
#include <vespa/fastos/thread.h>
#include <thread>
@@ -32,36 +31,6 @@ namespace mbus {
namespace {
-/**
- * Implements a helper class for {@link RPCNetwork#sync()}. It provides a
- * blocking method {@link #await()} that will wait until the internal state
- * of this object is set to 'done'. By scheduling this task in the network
- * thread and then calling this method, we achieve handshaking with the network
- * thread.
- */
-class SyncTask : public FNET_Task {
-private:
- vespalib::Gate _gate;
-
-public:
- SyncTask(FNET_Scheduler &s) :
- FNET_Task(&s),
- _gate() {
- ScheduleNow();
- }
- ~SyncTask() override {
- Kill();
- }
-
- void await() {
- _gate.await();
- }
-
- void PerformTask() override {
- _gate.countDown();
- }
-};
-
struct TargetPoolTask : public FNET_Task {
RPCTargetPool &_pool;
@@ -412,8 +381,8 @@ RPCNetwork::send(RPCNetwork::SendContext &ctx)
void
RPCNetwork::sync()
{
- SyncTask task(_scheduler);
- task.await();
+ _transport->sync(); // Ensure transport loop run at least once to execute task scheduled for NOW
+ _transport->sync(); // And then once more to ensure they are all done.
}
void