blob: f095e2eabcae7114e7820ae699be4dbd2caf0b8b (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
#include "imessagehandler.h"
#include "ireplyhandler.h"
#include "message.h"
#include "reply.h"
#include <vespa/vespalib/util/executor.h>
#include <vespa/vespalib/util/arrayqueue.hpp>
#include <condition_variable>
#include <mutex>
#include <thread>
namespace mbus {
/**
* This class implements a single thread that is able to process arbitrary
* tasks. Tasks are enqueued using the synchronized {@link #enqueue(Task)}
* method, and are run in the order they were enqueued.
*/
class Messenger {
public:
/**
* Defines the required interface for tasks to be posted to this worker.
*/
class ITask : public vespalib::Executor::Task {
protected:
ITask() = default;
public:
ITask(const ITask &) = delete;
ITask & operator = (const ITask &) = delete;
/**
* Convenience typedefs.
*/
using UP = std::unique_ptr<ITask>;
/**
* Returns the priority of this task.
*/
virtual uint8_t priority() const = 0;
};
private:
mutable std::mutex _lock;
std::condition_variable _cond;
std::vector<ITask*> _children;
vespalib::ArrayQueue<ITask*> _queue;
bool _closed;
std::thread _thread;
protected:
void run();
public:
Messenger();
/**
* Frees any allocated resources. Also destroys all queued tasks.
*/
~Messenger();
/**
* Adds a recurrent task to this that is to be run for every iteration of
* the main loop. This task must be very light-weight as to not block the
* messenger. This method is thread-safe.
*
* @param task The task to add.
*/
void addRecurrentTask(ITask::UP task);
/**
* Discards all the recurrent tasks previously added to using the {@link
* #addRecurrentTask(ITask)} method. This method is thread-safe.
*/
void discardRecurrentTasks();
/**
* Starts the internal thread. This must be done AFTER all recurrent tasks
* have been added.
*
* @return True if the thread was started.
* @see #addRecurrentTask(ITask)
*/
bool start();
/**
* Handshakes with the internal thread. If this method is called using the
* messenger thread, this will deadlock.
*/
void sync();
/**
* Convenience method to post a {@link MessageTask} to the queue of tasks to
* be executed.
*
* @param msg The message to send.
* @param handler The handler to send to.
*/
void deliverMessage(Message::UP msg, IMessageHandler &handler);
/**
* Convenience method to post a {@link ReplyTask} to the queue of tasks to
* be executed.
*
* @param reply The reply to return.
* @param handler The handler to return to.
*/
void deliverReply(Reply::UP reply, IReplyHandler &handler);
/**
* Enqueues the given task in the list of tasks that this worker is to
* process. If this thread has been destroyed previously, this method
* invokes {@link Messenger.Task#destroy()}.
*
* @param task The task to enqueue.
*/
void enqueue(ITask::UP task);
/**
* Returns whether or not there are any tasks queued for execution.
*
* @return True if there are no tasks.
*/
bool isEmpty() const;
};
} // namespace mbus
|