summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-04 18:30:45 +0200
committerGitHub <noreply@github.com>2020-04-04 18:30:45 +0200
commited34407f0470d5ff5446df903c57f7b649dc835f (patch)
treeeb68c779a18bf943f52e8dbeadaf60ff66bd51e0
parentbc1ca93c91c88630f4b61e98169b224143b8829c (diff)
parenta8043a2e3edb544442961e3947c466e6d45ad716 (diff)
Merge pull request #12828 from vespa-engine/balder/control-naptime
Balder/control naptime
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp13
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h3
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def4
4 files changed, 16 insertions, 6 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index e77664b0987..e049e436d03 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -87,7 +87,7 @@ createExecutor(RPCNetworkParams::OptimizeFor optimizeFor) {
return std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000);
case RPCNetworkParams::OptimizeFor::THROUGHPUT:
default:
- return std::make_unique<vespalib::SingleExecutor>(100);
+ return std::make_unique<vespalib::SingleExecutor>(1000, 10, 1ms);
}
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 90eb18c23ef..6492c301fe5 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -6,6 +6,10 @@
namespace vespalib {
SingleExecutor::SingleExecutor(uint32_t taskLimit)
+ : SingleExecutor(taskLimit, taskLimit/10, 5ms)
+{ }
+
+SingleExecutor::SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration napTime)
: _taskLimit(vespalib::roundUp2inN(taskLimit)),
_wantedTaskLimit(_taskLimit.load()),
_rp(0),
@@ -19,10 +23,13 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit)
_wakeupConsumerAt(0),
_producerNeedWakeupAt(0),
_wp(0),
+ _watermark(watermark),
+ _napTime(napTime),
_closed(false)
{
_thread.start();
}
+
SingleExecutor::~SingleExecutor() {
shutdown();
sync();
@@ -102,10 +109,10 @@ SingleExecutor::run() {
while (!_thread.stopped()) {
drain_tasks();
_producerCondition.notify_all();
- _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) / 4), std::memory_order_relaxed);
+ _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + _watermark, std::memory_order_relaxed);
Lock lock(_mutex);
if (numTasks() <= 0) {
- _consumerCondition.wait_for(lock, 10ms);
+ _consumerCondition.wait_for(lock, _napTime);
}
_wakeupConsumerAt.store(0, std::memory_order_relaxed);
}
@@ -144,7 +151,7 @@ SingleExecutor::wait_for_room(Lock & lock) {
}
_queueSize.add(numTasks());
while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
- sleepProducer(lock, 10ms, wp - taskLimit/4);
+ sleepProducer(lock, _napTime, wp - _watermark);
}
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 3d759769ea3..cb78f8448f4 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -19,6 +19,7 @@ namespace vespalib {
class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable {
public:
explicit SingleExecutor(uint32_t taskLimit);
+ SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration napTime);
~SingleExecutor() override;
Task::UP execute(Task::UP task) override;
void setTaskLimit(uint32_t taskLimit) override;
@@ -56,6 +57,8 @@ private:
std::atomic<uint64_t> _wakeupConsumerAt;
std::atomic<uint64_t> _producerNeedWakeupAt;
std::atomic<uint64_t> _wp;
+ const uint32_t _watermark;
+ const duration _napTime;
bool _closed;
};
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index 0bc3f67f9c3..e0f2e89fa9d 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -35,9 +35,9 @@ mbus.num_network_threads int default=2
## Number of workers threads for messagebus.
## Any value below 1 will be 1.
-mbus.num_threads int default=2
+mbus.num_threads int default=1
-mbus.optimize_for enum {LATENCY, THROUGHPUT} default = LATENCY
+mbus.optimize_for enum {LATENCY, THROUGHPUT} default = THROUGHPUT
## Enable to use above thread pool for encoding replies
## False will use network(fnet) thread