blob: feb2eb1ad84e70d48e9a8ee81532a2aff955ed41 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
#include <type_traits>
#include <condition_variable>
#include <vector>
namespace vespalib {
/**
* A place where threads meet up and exchange information. Each
* participating thread calls the rendezvous function with an input
* value. Execution will be blocked until enough threads are present,
* at which point mingle will be called with input and output values
* for all threads available at the same time. When mingle completes,
* each thread resumes and returns the output value assigned to
* it. This class implements all needed thread synchronization. The
* subclass needs to implement the mingle function to supply the
* application logic.
**/
template <typename IN, typename OUT, bool external_id = false>
class Rendezvous
{
private:
std::mutex _lock;
std::condition_variable _cond;
size_t _size;
size_t _next;
size_t _gen;
std::vector<IN *> _in;
std::vector<OUT *> _out;
/**
* Function called to perform the actual inter-thread state
* processing.
**/
virtual void mingle() = 0;
/**
* lock-free version for when there is only one thread meeting
* itself.
**/
void meet_self(IN &input, OUT &output);
/**
* general version for when there are multiple threads meeting.
**/
void meet_others(IN &input, OUT &output, size_t my_id, std::unique_lock<std::mutex> guard);
protected:
/**
* Obtain an input parameter. This function is called by mingle.
*
* @return reference to the appropriate input
* @param i the index of the requested input [0 .. size-1]
**/
IN &in(size_t i) const { return *_in[i]; }
/**
* Obtain the storage location of an output parameter. This
* function is called by mingle.
*
* @return reference to the appropriate output
* @param i the index of the requested output [0 .. size-1]
**/
OUT &out(size_t i) { return *_out[i]; }
public:
/**
* Create a Rendezvous with the given size. The size defines the
* number of threads that need to call the rendezvous function to
* trigger a mingle operation. The size of a Rendezvous must be at
* least 1.
*
* @param n the size of this Rendezvous
**/
Rendezvous(size_t n);
virtual ~Rendezvous();
/**
* @return number of participants
**/
size_t size() const { return _size; }
/**
* Called by individual threads to synchronize execution and share
* state with the mingle function.
*
* @return output parameter for a single thread
* @param input input parameter for a single thread
**/
OUT rendezvous(IN input) requires (!external_id);
/**
* Called by individual threads to synchronize execution and share
* state with the mingle function where each caller has a
* pre-defined participation id (enable by setting the external_id
* template flag).
*
* @return output parameter for a single thread
* @param input input parameter for a single thread
* @param my_id participant id for this thread (must be in range and
* not conflicting with other threads)
**/
OUT rendezvous(IN input, size_t my_id) requires (external_id);
};
} // namespace vespalib
#include "rendezvous.hpp"
|