AIStatefulTask ‐ Asynchronous, Stateful Task Scheduler library.

Threads-like task objects evolving through user-defined states.

TaskEvent.h
1#pragma once
2
3#include "AIStatefulTask.h"
5#include "utils/DequeAllocator.h"
6#include <deque>
7#include "debug.h"
8
9namespace statefultask {
10
11// TaskEvent
12//
13// Usage:
14//
15// Task A has a public member of type TaskEvent and calls
16// trigger() on it when the event occurs. For example,
17//
18// TaskEvent m_foo_available;
19// Foo get_foo(); // Only call this when foo is available.
20//
21// Other tasks have a state that does:
22//
23// case WaitForFoo:
24// task_a->m_foo_available.register_task(this, foo_available_conditon);
25// set_state(FooAvailable);
26// wait(foo_available_conditon);
27// break;
28// case FooAvailable:
29// {
30// Foo foo = task_a->get_foo();
31//
33{
34 private:
35 using data_type = std::pair<boost::intrusive_ptr<AIStatefulTask>, AIStatefulTask::condition_type>;
36 using container_type = std::deque<data_type, utils::DequeAllocator<data_type>>;
37 using registered_tasks_t = aithreadsafe::Wrapper<container_type, aithreadsafe::policy::Primitive<std::mutex>>;
38
39 mutable utils::NodeMemoryResource m_nmr{AIMemoryPagePool::instance()};
40 mutable registered_tasks_t m_registered_tasks{utils::DequeAllocator<data_type>(m_nmr)};
41 std::atomic_bool m_triggered = false;
42
43 void trigger(container_type const& waiting_tasks) const
44 {
45 for (auto& p : waiting_tasks)
46 p.first->signal(p.second);
47 }
48
49 public:
50 // Call task->signal(condition) if trigger() was already called, otherwise
51 // keep task alive and call signal when trigger is called.
52 // Not really "const" because it alters m_registered_tasks and m_nmr, but this way is
53 // more convenient: now we can call m_other_task->some_event.register_task(this, my_condition)
54 // from a task where m_other_task is pointer to (otherwise) const.
55 void register_task(AIStatefulTask* task, AIStatefulTask::condition_type condition) const
56 {
57 using std::memory_order;
58 if (m_triggered.load(memory_order::relaxed))
59 task->signal(condition);
60 else
61 {
62 container_type* waiting_tasks;
63 bool need_trigger = false;
64 {
65 registered_tasks_t::wat registered_tasks_w(m_registered_tasks);
66 registered_tasks_w->emplace_back(task, condition);
67 // In the unlikely case that the event was triggered between reading
68 // m_triggered at the top of this function and locking m_registered_tasks:
69 if (AI_UNLIKELY(m_triggered.load(memory_order::relaxed)))
70 {
71 waiting_tasks = new container_type(utils::DequeAllocator<data_type>(m_nmr));
72 waiting_tasks->swap(*registered_tasks_w);
73 need_trigger = true;
74 }
75 } // Unlock m_registered_tasks.
76 if (AI_UNLIKELY(need_trigger))
77 {
78 trigger(*waiting_tasks);
79 delete waiting_tasks;
80 }
81 }
82 }
83
84 // Mark that this event was triggered and call signal on the tasks that already
85 // called register_task.
86 void trigger()
87 {
88 using std::memory_order;
89 // It makes no sense to trigger it twice.
90 ASSERT(!m_triggered.load(memory_order::relaxed));
91 m_triggered.store(true, memory_order::relaxed);
92 // Move the deque to a local variable, so that we don't keep the lock on m_registered_tasks while calling the signal()'s.
93 container_type waiting_tasks(std::move(*registered_tasks_t::rat(m_registered_tasks)));
94 trigger(waiting_tasks);
95 }
96};
97
98} // namespace statefultask
Declaration of base class AIStatefulTask.
Declaration of class DefaultMemoryPagePool.
Definition: AIStatefulTask.h:96
uint32_t condition_type
The type of the skip_wait and idle bit masks.
Definition: AIStatefulTask.h:99
static utils::MemoryPagePool & instance()
Definition: DefaultMemoryPagePool.h:142
Definition: TaskEvent.h:33
bool signal(condition_type condition)
Definition: AIStatefulTask.cxx:1643
Tasks defined by the library project are put into this namespace.
Definition: AIStatefulTask.h:857