diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-04 18:30:45 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-04 18:30:45 +0200 |
commit | ed34407f0470d5ff5446df903c57f7b649dc835f (patch) | |
tree | eb68c779a18bf943f52e8dbeadaf60ff66bd51e0 | |
parent | bc1ca93c91c88630f4b61e98169b224143b8829c (diff) | |
parent | a8043a2e3edb544442961e3947c466e6d45ad716 (diff) |
Merge pull request #12828 from vespa-engine/balder/control-naptime
Balder/control naptime
4 files changed, 16 insertions, 6 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index e77664b0987..e049e436d03 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -87,7 +87,7 @@ createExecutor(RPCNetworkParams::OptimizeFor optimizeFor) { return std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000); case RPCNetworkParams::OptimizeFor::THROUGHPUT: default: - return std::make_unique<vespalib::SingleExecutor>(100); + return std::make_unique<vespalib::SingleExecutor>(1000, 10, 1ms); } } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 90eb18c23ef..6492c301fe5 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -6,6 +6,10 @@ namespace vespalib { SingleExecutor::SingleExecutor(uint32_t taskLimit) + : SingleExecutor(taskLimit, taskLimit/10, 5ms) +{ } + +SingleExecutor::SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration napTime) : _taskLimit(vespalib::roundUp2inN(taskLimit)), _wantedTaskLimit(_taskLimit.load()), _rp(0), @@ -19,10 +23,13 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit) _wakeupConsumerAt(0), _producerNeedWakeupAt(0), _wp(0), + _watermark(watermark), + _napTime(napTime), _closed(false) { _thread.start(); } + SingleExecutor::~SingleExecutor() { shutdown(); sync(); @@ -102,10 +109,10 @@ SingleExecutor::run() { while (!_thread.stopped()) { drain_tasks(); _producerCondition.notify_all(); - _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) / 4), std::memory_order_relaxed); + _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + _watermark, std::memory_order_relaxed); Lock lock(_mutex); if (numTasks() <= 0) { - _consumerCondition.wait_for(lock, 10ms); + _consumerCondition.wait_for(lock, _napTime); } _wakeupConsumerAt.store(0, std::memory_order_relaxed); } @@ -144,7 +151,7 @@ SingleExecutor::wait_for_room(Lock & lock) { } _queueSize.add(numTasks()); while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { - sleepProducer(lock, 10ms, wp - taskLimit/4); + sleepProducer(lock, _napTime, wp - _watermark); } } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 3d759769ea3..cb78f8448f4 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -19,6 +19,7 @@ namespace vespalib { class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable { public: explicit SingleExecutor(uint32_t taskLimit); + SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration napTime); ~SingleExecutor() override; Task::UP execute(Task::UP task) override; void setTaskLimit(uint32_t taskLimit) override; @@ -56,6 +57,8 @@ private: std::atomic<uint64_t> _wakeupConsumerAt; std::atomic<uint64_t> _producerNeedWakeupAt; std::atomic<uint64_t> _wp; + const uint32_t _watermark; + const duration _napTime; bool _closed; }; diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index 0bc3f67f9c3..e0f2e89fa9d 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -35,9 +35,9 @@ mbus.num_network_threads int default=2 ## Number of workers threads for messagebus. ## Any value below 1 will be 1. -mbus.num_threads int default=2 +mbus.num_threads int default=1 -mbus.optimize_for enum {LATENCY, THROUGHPUT} default = LATENCY +mbus.optimize_for enum {LATENCY, THROUGHPUT} default = THROUGHPUT ## Enable to use above thread pool for encoding replies ## False will use network(fnet) thread |