summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fnet/src/vespa/fnet/transport.cpp11
-rw-r--r--fnet/src/vespa/fnet/transport.h3
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp1
-rw-r--r--fnet/src/vespa/fnet/transport_thread.h8
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp1
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h41
-rw-r--r--vespalib/src/vespa/vespalib/net/async_resolver.cpp10
-rw-r--r--vespalib/src/vespa/vespalib/net/async_resolver.h12
8 files changed, 47 insertions, 40 deletions
diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp
index d3b52969c8c..8543d648400 100644
--- a/fnet/src/vespa/fnet/transport.cpp
+++ b/fnet/src/vespa/fnet/transport.cpp
@@ -3,6 +3,7 @@
#include "transport.h"
#include "transport_thread.h"
#include "iocomponent.h"
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <chrono>
#include <xxhash.h>
@@ -27,12 +28,12 @@ VESPA_THREAD_STACK_TAG(fnet_work_pool);
FNET_Transport::FNET_Transport(vespalib::AsyncResolver::SP resolver, vespalib::CryptoEngine::SP crypto, size_t num_threads)
: _async_resolver(std::move(resolver)),
_crypto_engine(std::move(crypto)),
- _work_pool(1, 128 * 1024, fnet_work_pool, 1024),
+ _work_pool(std::make_unique<vespalib::ThreadStackExecutor>(1, 128 * 1024, fnet_work_pool, 1024)),
_threads()
{
assert(num_threads >= 1);
for (size_t i = 0; i < num_threads; ++i) {
- _threads.emplace_back(new FNET_TransportThread(*this));
+ _threads.emplace_back(std::make_unique<FNET_TransportThread>(*this));
}
}
@@ -41,7 +42,7 @@ FNET_Transport::~FNET_Transport() = default;
void
FNET_Transport::post_or_perform(vespalib::Executor::Task::UP task)
{
- if (auto rejected = _work_pool.execute(std::move(task))) {
+ if (auto rejected = _work_pool->execute(std::move(task))) {
rejected->run();
}
}
@@ -161,7 +162,7 @@ FNET_Transport::ShutDown(bool waitFinished)
}
if (waitFinished) {
_async_resolver->wait_for_pending_resolves();
- _work_pool.shutdown().sync();
+ _work_pool->shutdown().sync();
}
}
@@ -172,7 +173,7 @@ FNET_Transport::WaitFinished()
thread->WaitFinished();
}
_async_resolver->wait_for_pending_resolves();
- _work_pool.shutdown().sync();
+ _work_pool->shutdown().sync();
}
bool
diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h
index 02ef22c7fb6..29f1fb2144f 100644
--- a/fnet/src/vespa/fnet/transport.h
+++ b/fnet/src/vespa/fnet/transport.h
@@ -7,7 +7,6 @@
#include <vector>
#include <vespa/vespalib/net/async_resolver.h>
#include <vespa/vespalib/net/crypto_engine.h>
-#include <vespa/vespalib/util/threadstackexecutor.h>
class FNET_TransportThread;
class FastOS_ThreadPool;
@@ -30,7 +29,7 @@ private:
vespalib::AsyncResolver::SP _async_resolver;
vespalib::CryptoEngine::SP _crypto_engine;
- vespalib::ThreadStackExecutor _work_pool;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _work_pool;
Threads _threads;
public:
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index a20de880f15..ba9c4bb7789 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -9,6 +9,7 @@
#include "transport.h"
#include <vespa/vespalib/net/socket_spec.h>
#include <vespa/vespalib/net/server_socket.h>
+#include <vespa/vespalib/util/gate.h>
#include <csignal>
#include <vespa/log/log.h>
diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h
index 8bced290962..966ffb849e0 100644
--- a/fnet/src/vespa/fnet/transport_thread.h
+++ b/fnet/src/vespa/fnet/transport_thread.h
@@ -54,10 +54,6 @@ private:
bool _finished; // event loop stopped ?
bool _waitFinished; // someone is waiting for _finished
- FNET_TransportThread(const FNET_TransportThread &);
- FNET_TransportThread &operator=(const FNET_TransportThread &);
-
-
/**
* Add an IOComponent to the list of components. This operation is
* performed immidiately and without locking. This method should
@@ -177,6 +173,8 @@ private:
}
public:
+ FNET_TransportThread(const FNET_TransportThread &) = delete;
+ FNET_TransportThread &operator=(const FNET_TransportThread &) = delete;
/**
* Construct a transport object. To activate your newly created
* transport object you need to call either the Start method to
@@ -192,7 +190,7 @@ public:
* Destruct object. This should NOT be done before the transport
* thread has completed it's work and raised the finished flag.
**/
- ~FNET_TransportThread();
+ ~FNET_TransportThread() override;
/**
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 5872706c443..893b8d1d2ca 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -14,6 +14,7 @@
#include <vespa/vespalib/component/vtag.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/fnet/scheduler.h>
#include <vespa/fnet/transport.h>
#include <vespa/fnet/frt/supervisor.h>
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index 2780c3e8770..d701358fc84 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -15,6 +15,7 @@
#include <vespa/fnet/frt/invokable.h>
class FNET_Transport;
+class FastOS_ThreadPool;
namespace slobrok {
namespace api { class RegisterAPI; }
@@ -52,26 +53,26 @@ private:
using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>;
- INetworkOwner *_owner;
- Identity _ident;
- std::unique_ptr<FastOS_ThreadPool> _threadPool;
- std::unique_ptr<FNET_Transport> _transport;
- std::unique_ptr<FRT_Supervisor> _orb;
- FNET_Scheduler &_scheduler;
- std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory;
- std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
- std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
- int _requestedPort;
- std::unique_ptr<RPCTargetPool> _targetPool;
- std::unique_ptr<FNET_Task> _targetPoolTask;
- std::unique_ptr<RPCServicePool> _servicePool;
- std::unique_ptr<vespalib::ThreadStackExecutor> _executor;
- std::unique_ptr<RPCSendAdapter> _sendV1;
- std::unique_ptr<RPCSendAdapter> _sendV2;
- SendAdapterMap _sendAdapters;
- CompressionConfig _compressionConfig;
- bool _allowDispatchForEncode;
- bool _allowDispatchForDecode;
+ INetworkOwner *_owner;
+ Identity _ident;
+ std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
+ std::unique_ptr<FRT_Supervisor> _orb;
+ FNET_Scheduler &_scheduler;
+ std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory;
+ std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
+ std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
+ int _requestedPort;
+ std::unique_ptr<RPCTargetPool> _targetPool;
+ std::unique_ptr<FNET_Task> _targetPoolTask;
+ std::unique_ptr<RPCServicePool> _servicePool;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _executor;
+ std::unique_ptr<RPCSendAdapter> _sendV1;
+ std::unique_ptr<RPCSendAdapter> _sendV2;
+ SendAdapterMap _sendAdapters;
+ CompressionConfig _compressionConfig;
+ bool _allowDispatchForEncode;
+ bool _allowDispatchForDecode;
/**
diff --git a/vespalib/src/vespa/vespalib/net/async_resolver.cpp b/vespalib/src/vespa/vespalib/net/async_resolver.cpp
index bc918434077..08e1774b1dd 100644
--- a/vespalib/src/vespa/vespalib/net/async_resolver.cpp
+++ b/vespalib/src/vespa/vespalib/net/async_resolver.cpp
@@ -2,6 +2,7 @@
#include "async_resolver.h"
#include "socket_spec.h"
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/log/log.h>
LOG_SETUP(".vespalib.net.async_resolver");
@@ -149,15 +150,20 @@ AsyncResolver::SP AsyncResolver::_shared_resolver(nullptr);
AsyncResolver::AsyncResolver(HostResolver::SP resolver, size_t num_threads)
: _resolver(std::move(resolver)),
- _executor(num_threads, 128*1024, async_resolver_executor_thread)
+ _executor(std::make_unique<ThreadStackExecutor>(num_threads, 128*1024, async_resolver_executor_thread))
{
}
void
+AsyncResolver::wait_for_pending_resolves() {
+ _executor->sync();
+}
+
+void
AsyncResolver::resolve_async(const vespalib::string &spec, ResultHandler::WP result_handler)
{
auto task = std::make_unique<ResolveTask>(spec, *_resolver, std::move(result_handler));
- auto rejected = _executor.execute(std::move(task));
+ auto rejected = _executor->execute(std::move(task));
assert(!rejected);
}
diff --git a/vespalib/src/vespa/vespalib/net/async_resolver.h b/vespalib/src/vespa/vespalib/net/async_resolver.h
index 590e6672922..f9f2079004b 100644
--- a/vespalib/src/vespa/vespalib/net/async_resolver.h
+++ b/vespalib/src/vespa/vespalib/net/async_resolver.h
@@ -4,7 +4,7 @@
#include "socket_address.h"
#include "socket_spec.h"
-#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <vespa/vespalib/util/threadexecutor.h>
#include <vespa/vespalib/util/arrayqueue.hpp>
#include <chrono>
#include <memory>
@@ -117,15 +117,15 @@ private:
void run() override;
};
- HostResolver::SP _resolver;
- ThreadStackExecutor _executor;
- static std::mutex _shared_lock;
- static AsyncResolver::SP _shared_resolver;
+ HostResolver::SP _resolver;
+ std::unique_ptr<SyncableThreadExecutor> _executor;
+ static std::mutex _shared_lock;
+ static AsyncResolver::SP _shared_resolver;
AsyncResolver(HostResolver::SP resolver, size_t num_threads);
public:
void resolve_async(const vespalib::string &spec, ResultHandler::WP result_handler);
- void wait_for_pending_resolves() { _executor.sync(); }
+ void wait_for_pending_resolves();
static AsyncResolver::SP create(Params params);
static AsyncResolver::SP get_shared();
};