blob: ab3cfe6dac525b95148f3c0fcea1983a51d26c1f (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "producerconsumer.h"
namespace vespalib {
Consumer::Consumer(uint32_t maxQueue, bool inverse) :
_queue(NULL, maxQueue),
_inverse(inverse),
_operations(0)
{
}
Consumer::~Consumer()
{
}
Producer::Producer(uint32_t cnt, Consumer &target) :
_target(target),
_cnt(cnt),
_operations(0)
{
}
Producer::~Producer()
{
}
ProducerConsumer::ProducerConsumer(uint32_t cnt, bool inverse) :
_cnt(cnt),
_inverse(inverse),
_operationsConsumed(0),
_operationsProduced(0)
{
}
ProducerConsumer::~ProducerConsumer()
{
}
void Consumer::run(std::atomic<bool> &) {
for (;;) {
MemList ml = _queue.dequeue();
if (ml == NULL) {
return;
}
if (_inverse) {
for (uint32_t i = ml->size(); i > 0; --i) {
consume((*ml)[i - 1]);
_operations++;
}
} else {
for (uint32_t i = 0; i < ml->size(); ++i) {
consume((*ml)[i]);
_operations++;
}
}
delete ml;
}
}
void Producer::run(std::atomic<bool> &stop_flag) {
while (!stop_flag.load(std::memory_order_relaxed)) {
MemList ml = new MemListImpl();
for (uint32_t i = 0; i < _cnt; ++i) {
ml->push_back(produce());
_operations++;
}
_target.enqueue(ml);
}
_target.close();
}
void ProducerConsumer::run(std::atomic<bool> &stop_flag) {
while (!stop_flag.load(std::memory_order_relaxed)) {
MemListImpl ml;
for (uint32_t i = 0; i < _cnt; ++i) {
ml.push_back(produce());
_operationsProduced++;
}
if (_inverse) {
for (uint32_t i = ml.size(); i > 0; --i) {
consume(ml[i - 1]);
_operationsConsumed++;
}
} else {
for (uint32_t i = 0; i < ml.size(); ++i) {
consume(ml[i]);
_operationsConsumed++;
}
}
}
}
}
|