aboutsummaryrefslogtreecommitdiffstats
path: root/vespamalloc/src/tests/allocfree/producerconsumer.cpp
blob: f06c35b63bdc3616ebeff703bf468d7c08d104b6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "producerconsumer.h"

namespace vespalib {

Consumer::Consumer(uint32_t maxQueue, bool inverse) :
   _queue(NULL, maxQueue),
   _inverse(inverse),
   _operations(0)
{
}

Consumer::~Consumer()
{
}

Producer::Producer(uint32_t cnt, Consumer &target) :
    _target(target),
    _cnt(cnt),
    _operations(0)
{
}

Producer::~Producer()
{
}

ProducerConsumer::ProducerConsumer(uint32_t cnt, bool inverse) :
    _cnt(cnt),
    _inverse(inverse),
    _operationsConsumed(0),
    _operationsProduced(0)
{
}

ProducerConsumer::~ProducerConsumer()
{
}


void Consumer::run(std::atomic<bool> &) {
    for (;;) {
        MemList ml = _queue.dequeue();
        if (ml == NULL) {
            return;
        }
        if (_inverse) {
            for (uint32_t i = ml->size(); i > 0; --i) {
                consume((*ml)[i - 1]);
                _operations++;
            }
        } else {
            for (uint32_t i = 0; i < ml->size(); ++i) {
                consume((*ml)[i]);
                _operations++;
            }
        }
        delete ml;
    }
}

void Producer::run(std::atomic<bool> &stop_flag) {
    while (!stop_flag.load(std::memory_order_relaxed)) {
        MemList ml = new MemListImpl();
        for (uint32_t i = 0; i < _cnt; ++i) {
            ml->push_back(produce());
            _operations++;
        }
        _target.enqueue(ml);
    }
    _target.close();
}

void ProducerConsumer::run(std::atomic<bool> &stop_flag) {
    while (!stop_flag.load(std::memory_order_relaxed)) {
        MemListImpl ml;
        for (uint32_t i = 0; i < _cnt; ++i) {
            ml.push_back(produce());
            _operationsProduced++;
        }
        if (_inverse) {
            for (uint32_t i = ml.size(); i > 0; --i) {
                consume(ml[i - 1]);
                _operationsConsumed++;
            }
        } else {
            for (uint32_t i = 0; i < ml.size(); ++i) {
                consume(ml[i]);
                _operationsConsumed++;
            }
        }
    }
}

}