AIStatefulTask ‐ Asynchronous, Stateful Task Scheduler library.

Threads-like task objects evolving through user-defined states.

AIPackagedTask.h
Go to the documentation of this file.
1
34#pragma once
35
37#include "AIDelayedFunction.h"
38#include "threadpool/AIObjectQueue.h"
39#include "threadpool/AIThreadPool.h"
40
41#ifdef EXAMPLE_CODE // undefined
42
43int factorial(int n)
44{
45 int r = 1;
46 while(n > 1) r *= n--;
47 return r;
48}
49
50class Task : public AIStatefulTask
51{
52 protected:
53 using direct_base_type = AIStatefulTask; // The base class of this task.
54 ~Task() override { } // The destructor must be protected.
55
56 // The different states of the task.
57 enum task_state_type {
58 Task_start = direct_base_type::state_end,
59 Task_done,
60 };
61
62 // Override virtual functions.
63 char const* state_str_impl(state_type run_state) const override;
64 void multiplex_impl(state_type run_state) override;
65
66 public:
67 static constexpr state_type state_end = Task_done + 1; // One beyond the largest state.
68 Task() : AIStatefulTask(DEBUG_ONLY(true)),
69 m_calculate_factorial(this, 1, &factorial) { } // Prepare to run `factorial' in its own thread.
70
71 private:
72 AIPackagedTask<int(int)> m_calculate_factorial;
73};
74
75void Task::multiplex_impl(state_type run_state)
76{
77 switch(run_state)
78 {
79 case Task_start:
80 {
81 m_calculate_factorial(5); // "Call the function" -- this just copies the argument(s) to be passed to the executing thread.
82 set_state(Task_dispatch);
83 }
84 case Task_dispatch:
85 {
86 if (!m_calculate_factorial.dispatch()) // Execute the function `factorial' in its own thread.
87 {
88 yield_frames(1);
89 break;
90 }
91 set_state(Task_done); // and continue running this task at state Task_done once
92 break; // `factorial' has finished executing.
93 }
94 case Task_done:
95 {
96 std::cout << "The factorial of 5 = " << m_calculate_factorial.get() << std::endl;
97 finish();
98 break;
99 }
100 }
101}
102#endif // EXAMPLE_CODE
103
104#ifndef DOXYGEN
105template<typename F>
106class AIPackagedTask; // not defined.
107#endif
108
161template<typename R, typename ...Args>
162class AIPackagedTask<R(Args...)> : public AIFriendOfStatefulTask
163{
164 private:
165 enum { standby, deferred, executing, finished } m_phase; // Keeps track of whether the job is already executing or even finished.
167 AIDelayedFunction<R(Args...)> m_delayed_function;
168 AIQueueHandle m_queue_handle;
169
170 public:
179 AIPackagedTask(AIStatefulTask* parent_task, AIStatefulTask::condition_type condition, R (*fp)(Args...), AIQueueHandle object_queue_handle) :
180 AIFriendOfStatefulTask(parent_task), m_phase(standby), m_condition(condition), m_delayed_function(fp), m_queue_handle(object_queue_handle) { }
181
191 template<class C>
192 AIPackagedTask(AIStatefulTask* parent_task, AIStatefulTask::condition_type condition, C* object, R (C::*memfp)(Args...), AIQueueHandle object_queue_handle) :
193 AIFriendOfStatefulTask(parent_task), m_phase(standby), m_condition(condition), m_delayed_function(object, memfp), m_queue_handle(object_queue_handle) { }
194
196 ~AIPackagedTask();
197
199 void swap(AIPackagedTask& other) noexcept
200 {
201 std::swap(m_condition, other.m_condition);
202 m_delayed_function.swap(other.m_delayed_function);
203 std::swap(m_queue_handle, other.m_queue_handle);
204 }
205
207 void operator()(Args... args);
208
217 bool dispatch();
218
219#ifndef DOXYGEN
220 // If Args isn't empty (has_args) then we need this signature in order to be Callable.
221 template<bool has_args = sizeof... (Args) != 0, typename std::enable_if<has_args, int>::type = 0>
222 void operator()(); // Invoke the function.
223#endif
224
230 R get() const
231 {
232 ASSERT(m_phase == finished); // Call dispatch() until it returns true, before calling get().
233 return m_delayed_function.get(); // Get the result.
234 }
235
236 private:
237 void invoke();
238};
239
240template<typename R, typename ...Args>
241AIPackagedTask<R(Args...)>::~AIPackagedTask()
242{
243 // It should be impossible to destruct an AIPackagedTask while it is still
244 // executing when it is a member of parent_task; and that is the only way
245 // that this class should be used. The reason that is impossible is because the
246 // parent_task should be in a waiting state until we call m_task->signal(m_condition)
247 // in invoke() below, which we only do after m_phase is set to finished. Hence,
248 // the parent_task will not be destructed and therefore we won't be destructed either.
249 ASSERT(m_phase != executing);
250}
251
252// Invoke the function (inlined because it's used in two places below).
253template<typename R, typename ...Args>
254inline void AIPackagedTask<R(Args...)>::invoke()
255{
256 m_delayed_function.invoke();
257 m_phase = finished;
258 m_task->signal(m_condition);
259}
260
261// This is executed by a different thread.
262template<typename R, typename ...Args>
263template<bool has_args, typename std::enable_if<has_args, int>::type>
264void AIPackagedTask<R(Args...)>::operator()()
265{
266 invoke();
267}
268
269// Store the function arguments (or invoke task from executing thread).
270template<typename R, typename ...Args>
271void AIPackagedTask<R(Args...)>::operator()(Args... args)
272{
273 // When sizeof...(Args) == 0 then the second time this function is called is by executing thread.
274 // The first call has to be fast, so assume it's unlikely.
275 if (sizeof...(Args) == 0 &&
276 AI_UNLIKELY(m_phase == executing))
277 {
278 invoke();
279 return;
280 }
281
282 // If m_phase == deferred then you should have called retry().
283 ASSERT(m_phase == standby);
284
285 // Store arguments.
286 m_delayed_function(args...);
287}
288
289// Called by parent task to dispatch the job to its own thread.
290// After finishing the job, the parent will be signaled with
291// m_condition set during construction.
292//
293// Returns true upon a successful queue; false when the queue is full
294// in which case dispatch() can be tried again (a bit later).
295template<typename R, typename ...Args>
296bool AIPackagedTask<R(Args...)>::dispatch()
297{
298 {
299 // Stop a new queue from being created while we're working with a queue, because that could move the queue.
300 AIThreadPool& thread_pool = AIThreadPool::instance();
301 auto queues_r = thread_pool.queues_read_access();
302 // Lock the queue.
303 auto& queue_ref = thread_pool.get_queue(queues_r, m_queue_handle);
304 {
305 auto queue = queue_ref.producer_access();
306 if (queue.length() == queue_ref.capacity())
307 {
308 m_phase = deferred;
309 return false;
310 }
311 // Pass job to thread pool.
312 m_phase = executing;
313 queue.move_in(std::function<bool()>([this](){ this->invoke(); return false; }));
314 } // Unlock queue.
315 // Now that we added something to queue, wake up one thread if needed.
316 queue_ref.notify_one();
317 } // And we're done with the queue, so also unlock AIThreadPool::m_queues.
318
319 // Halt task until job finished.
320 wait_until([this](){ return m_phase == finished; }, m_condition);
321 return true;
322}
Declaration of AIDelayedFunction, an object storing a function pointer and its arguments.
Base class of classes that extend AIStatefulTask derived tasks as member function.
Definition: AIFriendOfStatefulTask.h:61
AIPackagedTask(AIStatefulTask *parent_task, AIStatefulTask::condition_type condition, R(*fp)(Args...), AIQueueHandle object_queue_handle)
Definition: AIPackagedTask.h:179
R get() const
Definition: AIPackagedTask.h:230
void swap(AIPackagedTask &other) noexcept
Exchange the state with that of other.
Definition: AIPackagedTask.h:199
AIPackagedTask(AIStatefulTask *parent_task, AIStatefulTask::condition_type condition, C *object, R(C::*memfp)(Args...), AIQueueHandle object_queue_handle)
Definition: AIPackagedTask.h:192
Definition: AIStatefulTask.h:96
AIStatefulTask(bool debug)
Definition: AIStatefulTask.h:352
virtual void multiplex_impl(state_type run_state)=0
Called for base state bs_multiplex.
uint32_t condition_type
The type of the skip_wait and idle bit masks.
Definition: AIStatefulTask.h:99
virtual char const * state_str_impl(state_type run_state) const
Called to stringify a run state for debugging output. Must be overridden.
Definition: AIStatefulTask.cxx:1273