1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "request_scheduler.h"
#include <vbench/core/timer.h>
namespace vbench {
VESPA_THREAD_STACK_TAG(vbench_request_scheduler_executor);
VESPA_THREAD_STACK_TAG(vbench_handler_executor);
void
RequestScheduler::run()
{
double sleepTime;
std::vector<Request::UP> list;
vespalib::Thread &thread = vespalib::Thread::currentThread();
while (_queue.extract(_timer.sample(), list, sleepTime)) {
for (size_t i = 0; i < list.size(); ++i) {
Request::UP request = Request::UP(list[i].release());
_dispatcher.handle(std::move(request));
}
list.clear();
thread.slumber(sleepTime);
}
}
RequestScheduler::RequestScheduler(CryptoEngine::SP crypto, Handler<Request> &next, size_t numWorkers)
: _timer(),
_proxy(next, vbench_handler_executor),
_queue(10.0, 0.020),
_droppedTagger(_proxy),
_dispatcher(_droppedTagger),
_thread(*this, vbench_request_scheduler_executor),
_connectionPool(std::move(crypto), _timer),
_workers()
{
for (size_t i = 0; i < numWorkers; ++i) {
_workers.push_back(std::unique_ptr<Worker>(new Worker(_dispatcher, _proxy, _connectionPool, _timer)));
}
_dispatcher.waitForThreads(numWorkers, 256);
}
void
RequestScheduler::abort()
{
_queue.close();
_queue.discard();
_thread.stop();
}
void
RequestScheduler::handle(Request::UP request)
{
double scheduledTime = request->scheduledTime();
_queue.insert(std::move(request), scheduledTime);
}
void
RequestScheduler::start()
{
_timer.reset();
_thread.start();
}
RequestScheduler &
RequestScheduler::stop()
{
_queue.close();
return *this;
}
void
RequestScheduler::join()
{
_thread.join();
_dispatcher.close();
for (size_t i = 0; i < _workers.size(); ++i) {
_workers[i]->join();
}
_proxy.join();
}
} // namespace vbench
|