aboutsummaryrefslogtreecommitdiffstats
path: root/document/src/vespa/document/util/queue.h
blob: e9a53fe5ae3389efbcd3c9be572ed4494ed4dba6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once

#include <cassert>
#include <condition_variable>
#include <mutex>
#include <queue>

#define UNUSED_PARAM(p)
namespace document {

// XXX move to vespalib (or remove)
/**
 * semaphore implementation with copy/assignment functionality.
 **/
class Semaphore
{
private:
    int _count;
    int _numWaiters;
    std::mutex _lock;
    std::condition_variable _cond;
public:
    Semaphore(int count=0) : _count(count), _numWaiters(0), _lock() { }

    ~Semaphore() {
        std::lock_guard guard(_lock);
        assert(_numWaiters == 0);
    }

    bool wait(int ms) {
        bool gotSemaphore = false;
        std::unique_lock guard(_lock);
        if (_count == 0) {
            _numWaiters++;
            // we could retry if we get a signal but not the semaphore,
            // but then we risk waiting longer than expected, so
            // just ignore the return value here.
            _cond.wait_for(guard, std::chrono::milliseconds(ms));
            _numWaiters--;
        }
        if (_count > 0) {
            _count--;
            gotSemaphore = true;
        }
        assert(_count >= 0);
        return gotSemaphore;
    }

    bool wait() {
        std::unique_lock guard(_lock);
        while (_count == 0) {
            _numWaiters++;
            _cond.wait(guard);
            _numWaiters--;
        }
        _count--;
        assert(_count >= 0);
        return true;
    }

    void post() {
        std::unique_lock guard(_lock);
        assert(_count >= 0);
        _count++;
        if (_numWaiters > 0) {
            _cond.notify_one();
        }
    }
};


template <typename T, typename Q=std::queue<T> >
class QueueBase
{
public:
    QueueBase() : _lock(), _count(0), _q()   { }
    virtual ~QueueBase() { }
    size_t size()  const { return internal_size(); }
    bool   empty() const { return size() == 0; }
protected:
    std::mutex          _lock;
    document::Semaphore _count;
    Q                   _q;

    bool internal_push(const T& msg) { _q.push(msg); return true; }
    bool internal_pop(T& msg) {
        if (_q.empty()) {
            return false;
        } else {
            msg = _q.front();
            _q.pop();
            return true;
	}
    }
    size_t internal_size() const { return _q.size(); }
};

/**
 * This is a simple queue template that implements a thread safe Q by using
 * the stl::queue template. Not in any way optimized. Supports simple push and
 * pop operations together with read of size and empty check.
 **/
template <typename T, typename Q=std::queue<T> >
class Queue : public QueueBase<T, Q>
{
public:
    Queue() : QueueBase<T,Q>() { }
    bool push(const T& msg, int timeout=-1)
    {
        (void)timeout;
        bool retval;
        {
            std::lock_guard guard(this->_lock);
            retval = this->internal_push(msg);
        }
        this->_count.post();
        return retval;
    }
    bool pop(T& msg, int timeout=-1)
    {
        bool retval((timeout == -1) ?
                    this->_count.wait() :
                    this->_count.wait(timeout));
        if ( retval ) {
            std::lock_guard guard(this->_lock);
            retval = this->internal_pop(msg);
        }
        return retval;
    }
};

} // namespace document