summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fnet/src/vespa/fnet/transport.h8
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp35
-rw-r--r--vespalib/src/vespa/vespalib/util/invokeserviceimpl.cpp23
-rw-r--r--vespalib/src/vespa/vespalib/util/invokeserviceimpl.h2
5 files changed, 21 insertions, 51 deletions
diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h
index d6e4aefb02b..3f9328296d6 100644
--- a/fnet/src/vespa/fnet/transport.h
+++ b/fnet/src/vespa/fnet/transport.h
@@ -89,7 +89,7 @@ private:
FNET_Config _config;
vespalib::AsyncResolver::SP _resolver;
vespalib::CryptoEngine::SP _crypto;
- fnet::TimeTools::SP _time_tools;
+ fnet::TimeTools::SP _time_tools;
uint32_t _num_threads;
};
@@ -108,10 +108,10 @@ private:
vespalib::AsyncResolver::SP _async_resolver;
vespalib::CryptoEngine::SP _crypto_engine;
- fnet::TimeTools::SP _time_tools;
+ fnet::TimeTools::SP _time_tools;
std::unique_ptr<vespalib::SyncableThreadExecutor> _work_pool;
- Threads _threads;
- const FNET_Config _config;
+ Threads _threads;
+ const FNET_Config _config;
/**
* Wait for all pending resolve requests.
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index d46d174c670..590b6acbfff 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -127,7 +127,9 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket,
_queue.QueuePacket_NoLock(cpacket, context);
qLen = _queue.GetPacketCnt_NoLock();
}
- if (qLen == getConfig()._events_before_wakeup) {
+ if ((qLen == getConfig()._events_before_wakeup) ||
+ (cpacket->GetCommand() == FNET_ControlPacket::FNET_CMD_EXECUTE))
+ {
_selector.wakeup();
}
return true;
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index bc3ba205b69..030e3f956e1 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -18,7 +18,6 @@
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/vespalib/util/gate.h>
#include <vespa/fastos/thread.h>
#include <thread>
@@ -32,36 +31,6 @@ namespace mbus {
namespace {
-/**
- * Implements a helper class for {@link RPCNetwork#sync()}. It provides a
- * blocking method {@link #await()} that will wait until the internal state
- * of this object is set to 'done'. By scheduling this task in the network
- * thread and then calling this method, we achieve handshaking with the network
- * thread.
- */
-class SyncTask : public FNET_Task {
-private:
- vespalib::Gate _gate;
-
-public:
- SyncTask(FNET_Scheduler &s) :
- FNET_Task(&s),
- _gate() {
- ScheduleNow();
- }
- ~SyncTask() override {
- Kill();
- }
-
- void await() {
- _gate.await();
- }
-
- void PerformTask() override {
- _gate.countDown();
- }
-};
-
struct TargetPoolTask : public FNET_Task {
RPCTargetPool &_pool;
@@ -412,8 +381,8 @@ RPCNetwork::send(RPCNetwork::SendContext &ctx)
void
RPCNetwork::sync()
{
- SyncTask task(_scheduler);
- task.await();
+ _transport->sync(); // Ensure transport loop run at least once to execute task scheduled for NOW
+ _transport->sync(); // And then once more to ensure they are all done.
}
void
diff --git a/vespalib/src/vespa/vespalib/util/invokeserviceimpl.cpp b/vespalib/src/vespa/vespalib/util/invokeserviceimpl.cpp
index c10029b2f58..d4a4164fbf4 100644
--- a/vespalib/src/vespa/vespalib/util/invokeserviceimpl.cpp
+++ b/vespalib/src/vespa/vespalib/util/invokeserviceimpl.cpp
@@ -9,6 +9,7 @@ InvokeServiceImpl::InvokeServiceImpl(duration napTime)
: _naptime(napTime),
_now(steady_clock::now()),
_lock(),
+ _cond(),
_currId(0),
_closed(false),
_toInvoke(),
@@ -22,6 +23,7 @@ InvokeServiceImpl::~InvokeServiceImpl()
std::lock_guard guard(_lock);
assert(_toInvoke.empty());
_closed = true;
+ _cond.notify_all();
}
_thread->join();
}
@@ -39,7 +41,7 @@ public:
}
private:
InvokeServiceImpl * _service;
- uint64_t _id;
+ uint64_t _id;
};
std::unique_ptr<IDestructorCallback>
@@ -47,6 +49,7 @@ InvokeServiceImpl::registerInvoke(InvokeFunc func) {
std::lock_guard guard(_lock);
uint64_t id = _currId++;
_toInvoke.emplace_back(id, std::move(func));
+ _cond.notify_all();
return std::make_unique<Registration>(this, id);
}
@@ -58,25 +61,19 @@ InvokeServiceImpl::unregister(uint64_t id) {
});
assert (found != _toInvoke.end());
_toInvoke.erase(found);
+ _cond.notify_all();
}
void
InvokeServiceImpl::runLoop() {
- bool done = false;
- while ( ! done ) {
+ std::unique_lock guard(_lock);
+ while ( ! _closed ) {
_now.store(steady_clock::now(), std::memory_order_relaxed);
- {
- std::lock_guard guard(_lock);
- for (auto & func: _toInvoke) {
- func.second();
- }
- done = _closed;
- }
- if ( ! done) {
- std::this_thread::sleep_for(_naptime);
+ for (auto & func: _toInvoke) {
+ func.second();
}
+ _cond.wait_for(guard, _naptime);
}
-
}
}
diff --git a/vespalib/src/vespa/vespalib/util/invokeserviceimpl.h b/vespalib/src/vespa/vespalib/util/invokeserviceimpl.h
index beb57ca1ce0..ef0755e7614 100644
--- a/vespalib/src/vespa/vespalib/util/invokeserviceimpl.h
+++ b/vespalib/src/vespa/vespalib/util/invokeserviceimpl.h
@@ -7,6 +7,7 @@
#include <mutex>
#include <vector>
#include <thread>
+#include <condition_variable>
namespace vespalib {
@@ -29,6 +30,7 @@ private:
duration _naptime;
std::atomic<steady_time> _now;
std::mutex _lock;
+ std::condition_variable _cond;
uint64_t _currId;
bool _closed;
std::vector<IdAndFunc> _toInvoke;