blob: 47404bb365f5c3cdfeb5cc65a89b4b2f00d18bc0 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "throttlingoperationstarter.h"
#include <cassert>
namespace storage::distributor {
ThrottlingOperationStarter::ThrottlingOperation::~ThrottlingOperation()
{
_operationStarter.signalOperationFinished(*this);
}
ThrottlingOperationStarter::~ThrottlingOperationStarter() = default;
bool
ThrottlingOperationStarter::canStart(uint32_t currentOperationCount, Priority priority) const
{
uint32_t variablePending(_maxPending - _minPending);
uint32_t maxPendingForPri(_minPending + variablePending*((255.0 - priority) / 255.0));
return currentOperationCount < maxPendingForPri;
}
bool
ThrottlingOperationStarter::start(const std::shared_ptr<Operation>& operation, Priority priority)
{
if (!may_allow_operation_with_priority(priority)) {
operation->on_throttled();
return false;
}
auto wrappedOp = std::make_shared<ThrottlingOperation>(operation, *this);
++_pendingCount;
return _starterImpl.start(wrappedOp, priority);
}
bool
ThrottlingOperationStarter::may_allow_operation_with_priority(Priority priority) const noexcept
{
return canStart(_pendingCount, priority);
}
void
ThrottlingOperationStarter::signalOperationFinished(const Operation& op)
{
(void) op;
assert(_pendingCount > 0);
--_pendingCount;
}
}
|