summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fnet/src/vespa/fnet/transport.cpp14
-rw-r--r--fnet/src/vespa/fnet/transport.h9
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp15
-rw-r--r--fnet/src/vespa/fnet/transport_thread.h14
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp1
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h41
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp1
-rw-r--r--vespalib/src/vespa/vespalib/net/async_resolver.cpp10
-rw-r--r--vespalib/src/vespa/vespalib/net/async_resolver.h12
9 files changed, 68 insertions, 49 deletions
diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp
index d3b52969c8c..e59186069ce 100644
--- a/fnet/src/vespa/fnet/transport.cpp
+++ b/fnet/src/vespa/fnet/transport.cpp
@@ -3,6 +3,7 @@
#include "transport.h"
#include "transport_thread.h"
#include "iocomponent.h"
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <chrono>
#include <xxhash.h>
@@ -27,12 +28,13 @@ VESPA_THREAD_STACK_TAG(fnet_work_pool);
FNET_Transport::FNET_Transport(vespalib::AsyncResolver::SP resolver, vespalib::CryptoEngine::SP crypto, size_t num_threads)
: _async_resolver(std::move(resolver)),
_crypto_engine(std::move(crypto)),
- _work_pool(1, 128 * 1024, fnet_work_pool, 1024),
- _threads()
+ _work_pool(std::make_unique<vespalib::ThreadStackExecutor>(1, 128 * 1024, fnet_work_pool, 1024)),
+ _threads(),
+ _events_before_wakeup(1)
{
assert(num_threads >= 1);
for (size_t i = 0; i < num_threads; ++i) {
- _threads.emplace_back(new FNET_TransportThread(*this));
+ _threads.emplace_back(std::make_unique<FNET_TransportThread>(*this));
}
}
@@ -41,7 +43,7 @@ FNET_Transport::~FNET_Transport() = default;
void
FNET_Transport::post_or_perform(vespalib::Executor::Task::UP task)
{
- if (auto rejected = _work_pool.execute(std::move(task))) {
+ if (auto rejected = _work_pool->execute(std::move(task))) {
rejected->run();
}
}
@@ -161,7 +163,7 @@ FNET_Transport::ShutDown(bool waitFinished)
}
if (waitFinished) {
_async_resolver->wait_for_pending_resolves();
- _work_pool.shutdown().sync();
+ _work_pool->shutdown().sync();
}
}
@@ -172,7 +174,7 @@ FNET_Transport::WaitFinished()
thread->WaitFinished();
}
_async_resolver->wait_for_pending_resolves();
- _work_pool.shutdown().sync();
+ _work_pool->shutdown().sync();
}
bool
diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h
index 02ef22c7fb6..1126f68e69f 100644
--- a/fnet/src/vespa/fnet/transport.h
+++ b/fnet/src/vespa/fnet/transport.h
@@ -7,7 +7,6 @@
#include <vector>
#include <vespa/vespalib/net/async_resolver.h>
#include <vespa/vespalib/net/crypto_engine.h>
-#include <vespa/vespalib/util/threadstackexecutor.h>
class FNET_TransportThread;
class FastOS_ThreadPool;
@@ -30,8 +29,9 @@ private:
vespalib::AsyncResolver::SP _async_resolver;
vespalib::CryptoEngine::SP _crypto_engine;
- vespalib::ThreadStackExecutor _work_pool;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _work_pool;
Threads _threads;
+ size_t _events_before_wakeup;
public:
/**
@@ -53,6 +53,11 @@ public:
: FNET_Transport(vespalib::AsyncResolver::get_shared(), vespalib::CryptoEngine::get_default(), 1) {}
~FNET_Transport();
+ size_t events_before_wakeup() const { return _events_before_wakeup; }
+ void events_before_wakeup(size_t events_before_wakeup_in) {
+ _events_before_wakeup = events_before_wakeup_in;
+ }
+
/**
* Try to execute the given task on the internal work pool
* executor (post). If the executor has been closed or there is
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index a20de880f15..d61eaffa24f 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -9,6 +9,7 @@
#include "transport.h"
#include <vespa/vespalib/net/socket_spec.h>
#include <vespa/vespalib/net/server_socket.h>
+#include <vespa/vespalib/util/gate.h>
#include <csignal>
#include <vespa/log/log.h>
@@ -17,6 +18,7 @@ LOG_SETUP(".fnet");
using vespalib::ServerSocket;
using vespalib::SocketHandle;
using vespalib::SocketSpec;
+using OptimizeFor = vespalib::Executor::OptimizeFor;
namespace {
@@ -112,7 +114,7 @@ bool
FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket,
FNET_Context context)
{
- bool wasEmpty;
+ size_t qLen;
{
std::unique_lock<std::mutex> guard(_lock);
if (IsShutDown()) {
@@ -120,10 +122,10 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket,
SafeDiscardEvent(cpacket, context);
return false;
}
- wasEmpty = _queue.IsEmpty_NoLock();
_queue.QueuePacket_NoLock(cpacket, context);
+ qLen = _queue.GetPacketCnt_NoLock();
}
- if (wasEmpty) {
+ if (qLen == _owner.events_before_wakeup()) {
_selector.wakeup();
}
return true;
@@ -396,9 +398,8 @@ FNET_TransportThread::InitEventLoop()
return true;
}
-
void
-FNET_TransportThread::handle_wakeup()
+FNET_TransportThread::handle_wakeup_events()
{
{
std::lock_guard<std::mutex> guard(_lock);
@@ -479,7 +480,9 @@ FNET_TransportThread::EventLoopIteration()
// sample current time (performed once per event loop iteration)
_now = clock::now();
- // handle wakeup and io-events
+ handle_wakeup_events();
+
+ // handle io-events
_selector.dispatch(*this);
// handle IOC time-outs
diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h
index 8bced290962..b4319d4e2bc 100644
--- a/fnet/src/vespa/fnet/transport_thread.h
+++ b/fnet/src/vespa/fnet/transport_thread.h
@@ -54,10 +54,6 @@ private:
bool _finished; // event loop stopped ?
bool _waitFinished; // someone is waiting for _finished
- FNET_TransportThread(const FNET_TransportThread &);
- FNET_TransportThread &operator=(const FNET_TransportThread &);
-
-
/**
* Add an IOComponent to the list of components. This operation is
* performed immidiately and without locking. This method should
@@ -176,7 +172,11 @@ private:
return _shutdown.load(std::memory_order_relaxed);
}
+ void handle_wakeup_events();
+
public:
+ FNET_TransportThread(const FNET_TransportThread &) = delete;
+ FNET_TransportThread &operator=(const FNET_TransportThread &) = delete;
/**
* Construct a transport object. To activate your newly created
* transport object you need to call either the Start method to
@@ -192,7 +192,7 @@ public:
* Destruct object. This should NOT be done before the transport
* thread has completed it's work and raised the finished flag.
**/
- ~FNET_TransportThread();
+ ~FNET_TransportThread() override;
/**
@@ -458,8 +458,8 @@ public:
void WaitFinished();
- // selector call-back for selector wakeup
- void handle_wakeup();
+ // Empty selector call-back for selector wakeup
+ void handle_wakeup() { }
// selector call-back for io-events
void handle_event(FNET_IOComponent &ctx, bool read, bool write);
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 5872706c443..893b8d1d2ca 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -14,6 +14,7 @@
#include <vespa/vespalib/component/vtag.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/fnet/scheduler.h>
#include <vespa/fnet/transport.h>
#include <vespa/fnet/frt/supervisor.h>
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index 2780c3e8770..d701358fc84 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -15,6 +15,7 @@
#include <vespa/fnet/frt/invokable.h>
class FNET_Transport;
+class FastOS_ThreadPool;
namespace slobrok {
namespace api { class RegisterAPI; }
@@ -52,26 +53,26 @@ private:
using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>;
- INetworkOwner *_owner;
- Identity _ident;
- std::unique_ptr<FastOS_ThreadPool> _threadPool;
- std::unique_ptr<FNET_Transport> _transport;
- std::unique_ptr<FRT_Supervisor> _orb;
- FNET_Scheduler &_scheduler;
- std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory;
- std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
- std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
- int _requestedPort;
- std::unique_ptr<RPCTargetPool> _targetPool;
- std::unique_ptr<FNET_Task> _targetPoolTask;
- std::unique_ptr<RPCServicePool> _servicePool;
- std::unique_ptr<vespalib::ThreadStackExecutor> _executor;
- std::unique_ptr<RPCSendAdapter> _sendV1;
- std::unique_ptr<RPCSendAdapter> _sendV2;
- SendAdapterMap _sendAdapters;
- CompressionConfig _compressionConfig;
- bool _allowDispatchForEncode;
- bool _allowDispatchForDecode;
+ INetworkOwner *_owner;
+ Identity _ident;
+ std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
+ std::unique_ptr<FRT_Supervisor> _orb;
+ FNET_Scheduler &_scheduler;
+ std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory;
+ std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
+ std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
+ int _requestedPort;
+ std::unique_ptr<RPCTargetPool> _targetPool;
+ std::unique_ptr<FNET_Task> _targetPoolTask;
+ std::unique_ptr<RPCServicePool> _servicePool;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _executor;
+ std::unique_ptr<RPCSendAdapter> _sendV1;
+ std::unique_ptr<RPCSendAdapter> _sendV2;
+ SendAdapterMap _sendAdapters;
+ CompressionConfig _compressionConfig;
+ bool _allowDispatchForEncode;
+ bool _allowDispatchForDecode;
/**
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
index f8ee5afd1fd..b300080db9e 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
@@ -73,6 +73,7 @@ SharedRpcResources::SharedRpcResources(const config::ConfigUri& config_uri,
_rpc_server_port(rpc_server_port),
_shutdown(false)
{
+ _transport->events_before_wakeup(100);
}
// TODO make sure init/shutdown is safe for aborted init in comm. mgr.
diff --git a/vespalib/src/vespa/vespalib/net/async_resolver.cpp b/vespalib/src/vespa/vespalib/net/async_resolver.cpp
index bc918434077..08e1774b1dd 100644
--- a/vespalib/src/vespa/vespalib/net/async_resolver.cpp
+++ b/vespalib/src/vespa/vespalib/net/async_resolver.cpp
@@ -2,6 +2,7 @@
#include "async_resolver.h"
#include "socket_spec.h"
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/log/log.h>
LOG_SETUP(".vespalib.net.async_resolver");
@@ -149,15 +150,20 @@ AsyncResolver::SP AsyncResolver::_shared_resolver(nullptr);
AsyncResolver::AsyncResolver(HostResolver::SP resolver, size_t num_threads)
: _resolver(std::move(resolver)),
- _executor(num_threads, 128*1024, async_resolver_executor_thread)
+ _executor(std::make_unique<ThreadStackExecutor>(num_threads, 128*1024, async_resolver_executor_thread))
{
}
void
+AsyncResolver::wait_for_pending_resolves() {
+ _executor->sync();
+}
+
+void
AsyncResolver::resolve_async(const vespalib::string &spec, ResultHandler::WP result_handler)
{
auto task = std::make_unique<ResolveTask>(spec, *_resolver, std::move(result_handler));
- auto rejected = _executor.execute(std::move(task));
+ auto rejected = _executor->execute(std::move(task));
assert(!rejected);
}
diff --git a/vespalib/src/vespa/vespalib/net/async_resolver.h b/vespalib/src/vespa/vespalib/net/async_resolver.h
index 590e6672922..f9f2079004b 100644
--- a/vespalib/src/vespa/vespalib/net/async_resolver.h
+++ b/vespalib/src/vespa/vespalib/net/async_resolver.h
@@ -4,7 +4,7 @@
#include "socket_address.h"
#include "socket_spec.h"
-#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <vespa/vespalib/util/threadexecutor.h>
#include <vespa/vespalib/util/arrayqueue.hpp>
#include <chrono>
#include <memory>
@@ -117,15 +117,15 @@ private:
void run() override;
};
- HostResolver::SP _resolver;
- ThreadStackExecutor _executor;
- static std::mutex _shared_lock;
- static AsyncResolver::SP _shared_resolver;
+ HostResolver::SP _resolver;
+ std::unique_ptr<SyncableThreadExecutor> _executor;
+ static std::mutex _shared_lock;
+ static AsyncResolver::SP _shared_resolver;
AsyncResolver(HostResolver::SP resolver, size_t num_threads);
public:
void resolve_async(const vespalib::string &spec, ResultHandler::WP result_handler);
- void wait_for_pending_resolves() { _executor.sync(); }
+ void wait_for_pending_resolves();
static AsyncResolver::SP create(Params params);
static AsyncResolver::SP get_shared();
};