AIStatefulTask ‐ Asynchronous, Stateful Task Scheduler library.

Threads-like task objects evolving through user-defined states.

Broker.h
1#pragma once
2
3#include "AIStatefulTask.h"
4#include "BrokerKey.h"
5#include "threadsafe/AIReadWriteMutex.h"
6#include "utils/threading/MpscQueue.h"
7#include "utils/threading/Gate.h"
8#include <type_traits>
9#include <tuple>
10#ifdef CWDEBUG
11#include "utils/has_print_on.h"
12#endif
13
14#if defined(CWDEBUG) && !defined(DOXYGEN)
15NAMESPACE_DEBUG_CHANNELS_START
16extern channel_ct broker;
17NAMESPACE_DEBUG_CHANNELS_END
18#endif
19
20namespace task {
21#ifdef CWDEBUG
22using utils::has_print_on::operator<<;
23#endif
24
25template<TaskType Task, typename... Args>
26class Broker : public AIStatefulTask
27{
28 protected:
29 using direct_base_type = AIStatefulTask; // The immediate base class of this task.
30
31 // The different states of the task.
32 enum broker_state_type {
33 Broker_start = direct_base_type::state_end,
34 Broker_do_work
35 };
36
37 public:
38 static constexpr state_type state_end = Broker_do_work + 1; // The last state plus one.
39
40 private:
41 struct CallbackNode : utils::threading::MpscNode
42 {
43 std::function<void(bool)> m_callback;
44
45 CallbackNode(std::function<void(bool)>&& callback) : m_callback(std::move(callback)) { }
46 };
47
48 // Constness of this object means that we have got access to it through by read-locking m_key2task.
49 // In most cases that read-lock is even released again by the time this object is accessed.
50 //
51 // Concurrent access is made safe in other ways.
52 //
53 // The members are made mutable because this access also involves writing.
54 struct TaskPointerAndCallbackQueue {
55 boost::intrusive_ptr<Task> const m_task;
56 // Concurrent access is fine since callbacks_type is thread safe: access is protected by its own mutex.
57 mutable utils::threading::MpscQueue m_callbacks;
58 // Only when m_finished is loaded with acquire and is true, the Task that m_task points to and the boolean m_success may be read.
59 // m_finished is initialized at false and only set to true once by the Broker task, using memory_order_release.
60 // Other threads only read m_success after loading m_finished with memory_order_acquire and seeing that being true - which means that
61 // it is safe for the Broker task to write to m_success before setting m_finished to true.
62 mutable std::atomic<bool> m_finished; // Set to true when the callback of the actual task is called.
63 mutable bool m_success; // Set to the value passed to the actual callback of the task.
64 // This variable is only accessed by the Broker task; and thus it is virtually single-threaded.
65 mutable bool m_running;
66
67 TaskPointerAndCallbackQueue(boost::intrusive_ptr<Task>&& task, std::function<void(bool)>&& callback) :
68 m_task(std::move(task)), m_finished(false), m_success(false), m_running(false) { m_callbacks.push(NEW(CallbackNode(std::move(callback)))); }
69
70#ifdef CWDEBUG
71 void print_on(std::ostream& os) const
72 {
73 os << '"' << libcwd::type_info_of<Task>().demangled_name() << '"';
74 }
75#endif
76 };
77 using unordered_map_type = std::unordered_map<
78 statefultask::BrokerKey::unique_ptr,
79 TaskPointerAndCallbackQueue,
82 using map_type = aithreadsafe::Wrapper<unordered_map_type, aithreadsafe::policy::ReadWrite<AIReadWriteMutex>>;
83
84 map_type m_key2task;
85 bool m_is_immediate;
86 utils::threading::Gate m_finished;
87 std::tuple<CWDEBUG_ONLY(bool,) Args...> m_debugflag_task_args;
88
89 protected:
90 ~Broker() override { DoutEntering(dc::broker(mSMDebug), "~Broker() [" << (void*)this << "]"); }
91 char const* state_str_impl(state_type run_state) const override;
92 char const* task_name_impl() const override;
93 void multiplex_impl(state_type run_state) override;
94 void abort_impl() override;
95
96 public:
97 Broker(CWDEBUG_ONLY(bool debug,) Args... task_args) :
98 AIStatefulTask(CWDEBUG_ONLY(debug)), m_is_immediate(false), m_debugflag_task_args(CWDEBUG_ONLY(debug,) task_args...)
99 {
100 DoutEntering(dc::broker(mSMDebug), "Broker<" <<
101 libcwd::type_info_of<Task>().demangled_name() <<
102 ((LibcwDoutStream << ... << (std::string(", ") + libcwd::type_info_of<Args>().demangled_name())), ">::Broker(") <<
103 join(", ", task_args...) << ") [" << (void*)this << "]");
104 }
105
107 {
108 // If the handler passed here is immediate, then a call to run(key, callback) while
109 // the task is already finished will lead to an immediate call to the callback.
110 // Otherwise the callback is performed from the Broker task (under the handler).
111 m_is_immediate = handler.is_immediate();
112 AIStatefulTask::run(handler, [this](bool success){
113 // Can only be terminated by calling abort().
114 ASSERT(!success);
115 Dout(dc::broker, "task::Broker<" << libcwd::type_info_of<Task>().demangled_name() << "> terminated.");
116 m_finished.open();
117 });
118 }
119
120 void terminate()
121 {
122 abort();
123 m_finished.wait();
124 }
125
126 // Abort and run callback(task) for each task in m_key2task.
127 void terminate(std::function<void (Task*)> callback)
128 {
129 terminate();
130 typename map_type::wat key2task_w(m_key2task);
131 for (auto& element : *key2task_w)
132 callback(element.second.m_task.get());
133 }
134
135 // The returned pointer is meant to keep the task alive, not to access it (it is possibly shared between threads).
136 // Read access is allowed only after (during) the callback was called.
137 boost::intrusive_ptr<Task const> run(statefultask::BrokerKey const& key, std::function<void(bool)>&& callback);
138};
139
140template<TaskType Task, typename... Args>
141boost::intrusive_ptr<Task const> Broker<Task, Args...>::run(statefultask::BrokerKey const& key, std::function<void(bool)>&& callback)
142{
143 DoutEntering(dc::broker, "Broker<" << libcwd::type_info_of<Task>().demangled_name() << ", void>::run(" << key << ", callback)");
144 // This function returns a pointer to an immutable Task, because the returned
145 // task is shared between threads and readonly. Note that reading it is only allowed
146 // after the task finished running because otherwise writing may occur at the same
147 // time, which is UB.
148 // This must be a const* because we set it while only having a read lock on the unordered_map.
149 typename unordered_map_type::mapped_type const* entry;
150 // A boolean indicating if a task with the required key already existed or not.
151 bool task_created;
152 for (;;)
153 {
154 try
155 {
156 // Obtain a read-lock and read-access to m_key2task.
157 typename map_type::rat key2task_r(m_key2task);
158 // The cast is necessary because find() requires the non-const BrokerKey::unique_ptr reference
159 // (as opposed to BrokerKey::const_unique_ptr). It is safe because find() will not alter the
160 // BrokerKey pointed to.
161 auto search = key2task_r->find(const_cast<statefultask::BrokerKey&>(key).non_owning_ptr());
162 if ((task_created = search == key2task_r->end()))
163 {
164 // The task wasn't created yet.
165 // In order to do so, we have to obtain the write lock first.
166 typename map_type::wat key2task_w(key2task_r); // This might throw.
167 // Create the task and put the boost::intrusive_ptr to it into the unordered_map together with a CallbackQueue object
168 // already filled with callback, under key. Store the pointer to the new pair into entry.
169 boost::intrusive_ptr<Task> task = std::apply([](auto&&... args){ return statefultask::create<Task>(std::forward<decltype(args)>(args)...); }, m_debugflag_task_args);
170 entry = &key2task_w->try_emplace(key.copy(), std::move(task), std::move(callback)).first->second;
171 }
172 else
173 {
174 // The task already exists. Store a pointer to the element in the unordered_map; note that
175 // pointers (and references) to elements of an unorder_map are never invalidated, unless
176 // the element itself is erased.
177 entry = &search->second;
178 }
179 break;
180 }
181 catch (std::exception const&)
182 {
183 // Another thread is already trying to convert its read-lock into a write-lock.
184 // Let that thread grab it and create the task.
185 m_key2task.rd2wryield();
186 }
187 }
188 if (task_created)
189 {
190 // It is safe to do this without a read or write lock on m_key2task, because no
191 // other threads are accessing the (just created) Task. They are just waiting for
192 // it to be finished.
193 key.initialize(entry->m_task);
194 // Wake up the Broker task.
195 Dout(dc::broker, "Wake up Broker to run the newly created task.");
196 signal(1);
197 }
198 else
199 {
200 bool finished = entry->m_finished.load(std::memory_order_acquire);
201 Dout(dc::broker(finished), "This task already finished.");
202 if (finished && m_is_immediate)
203 callback(entry->m_success);
204 else
205 {
206 Dout(dc::broker, "Adding callback to the queue and wake up the Broker task.");
207 // Queue the call back.
208 entry->m_callbacks.push(NEW(CallbackNode(std::move(callback))));
209 signal(1);
210 }
211 }
212 return entry->m_task;
213}
214
215template<TaskType Task, typename... Args>
217{
218 switch (run_state)
219 {
220 AI_CASE_RETURN(Broker_start);
221 AI_CASE_RETURN(Broker_do_work);
222 }
223 AI_NEVER_REACHED;
224}
225
226template<TaskType Task, typename... Args>
228{
229 return "Broker<>";
230}
231
232template<TaskType Task, typename... Args>
234{
235 switch (run_state)
236 {
237 case Broker_start:
238 set_state(Broker_do_work);
239 Dout(dc::broker, "Waiting for initial task creation...");
240 wait(1);
241 break;
242 case Broker_do_work:
243 {
244 // New callbacks have been added. A new task might also have been added however and needs to be run.
245 // Get read access to the map with tasks.
246 {
247 typename map_type::rat key2task_r(m_key2task);
248#ifdef CWDEBUG
249 int size = key2task_r->size();
250 Dout(dc::broker(mSMDebug), ((size == 1) ? "There is " : "There are ") << size << " registered task" << ((size == 1) ? "." : "s."));
251#endif
252 for (typename unordered_map_type::const_iterator it = key2task_r->begin(); it != key2task_r->end(); ++it)
253 {
254 statefultask::BrokerKey::unique_ptr const& key{it->first};
255 TaskPointerAndCallbackQueue const& entry{it->second};
256 Dout(dc::broker(mSMDebug)|continued_cf, "Processing entry " << entry << " [" << *key << "]; ");
257 if (!entry.m_running)
258 {
259 entry.m_running = true;
260 Dout(dc::broker(mSMDebug), "The task of this entry wasn't started yet. Calling run() now:");
261 // Run the newly created task, causing a wake up of the Broker when it is done.
262 entry.m_task->run(this, 1, signal_parent);
263 Dout(dc::finish, "returned from run().");
264 }
265 else if (entry.m_task->finished())
266 {
267 entry.m_success = !entry.m_task->aborted();
268 entry.m_finished.store(true, std::memory_order_release);
269 // The task finished.
270 CallbackNode* head;
271 // Call all the callbacks that were registered so far.
272 while ((head = static_cast<CallbackNode*>(entry.m_callbacks.pop())))
273 {
274 CallbackNode* node = static_cast<CallbackNode*>(head);
275 node->m_callback(entry.m_success);
276 delete node;
277 }
278 Dout(dc::finish, "callback queue cleared.");
279 }
280 else
281 Dout(dc::finish, "skipping: not finished.");
282 }
283 }
284 Dout(dc::broker, "Waiting for more work...");
285 wait(1);
286 break;
287 }
288 }
289}
290
291template<TaskType Task, typename... Args>
293{
294 DoutEntering(dc::broker(mSMDebug), "Broker<"<< libcwd::type_info_of<Task>().demangled_name() << ">::abort_impl()");
295 typename map_type::rat key2task_r(m_key2task);
296 for (typename unordered_map_type::const_iterator it = key2task_r->begin(); it != key2task_r->end(); ++it)
297 {
298 TaskPointerAndCallbackQueue const& entry{it->second};
299 entry.m_task->abort();
300 utils::threading::MpscNode* head;
301 while ((head = entry.m_callbacks.pop()))
302 {
303 CallbackNode* node = static_cast<CallbackNode*>(head);
304 delete node;
305 }
306 }
307}
308
309} // namespace task
Declaration of base class AIStatefulTask.
Definition: AIStatefulTask.h:96
static constexpr state_type state_end
The next state value to use for derived classes.
Definition: AIStatefulTask.h:137
AIStatefulTask(bool debug)
Definition: AIStatefulTask.h:352
uint32_t state_type
The type of run_state.
Definition: AIStatefulTask.h:98
Definition: BrokerKey.h:17
Definition: Broker.h:27
void multiplex_impl(state_type run_state) override
Called for base state bs_multiplex.
Definition: Broker.h:233
char const * state_str_impl(state_type run_state) const override
Called to stringify a run state for debugging output. Must be overridden.
Definition: Broker.h:216
char const * task_name_impl() const override
This can be used to get a human readable name of the most-derived class. It must be guaranteed to alw...
Definition: Broker.h:227
void abort_impl() override
Called for base state bs_abort.
Definition: Broker.h:292
void abort()
Definition: AIStatefulTask.cxx:1690
void run(Handler default_handler, std::function< void(bool)> cb_function)
Definition: AIStatefulTask.cxx:1192
Definition: AIStatefulTask.h:164
@ immediate
Construct an immediate Handler.
Definition: AIStatefulTask.h:175
Definition: BrokerKey.h:48
Definition: BrokerKey.h:43