5#include "threadsafe/AIReadWriteMutex.h"
6#include "utils/threading/MpscQueue.h"
7#include "utils/threading/Gate.h"
11#include "utils/has_print_on.h"
14#if defined(CWDEBUG) && !defined(DOXYGEN)
15NAMESPACE_DEBUG_CHANNELS_START
16extern channel_ct broker;
17NAMESPACE_DEBUG_CHANNELS_END
22using utils::has_print_on::operator<<;
25template<TaskType Task,
typename... Args>
32 enum broker_state_type {
38 static constexpr state_type state_end = Broker_do_work + 1;
41 struct CallbackNode : utils::threading::MpscNode
43 std::function<void(
bool)> m_callback;
45 CallbackNode(std::function<
void(
bool)>&& callback) : m_callback(std::move(callback)) { }
54 struct TaskPointerAndCallbackQueue {
55 boost::intrusive_ptr<Task>
const m_task;
57 mutable utils::threading::MpscQueue m_callbacks;
62 mutable std::atomic<bool> m_finished;
63 mutable bool m_success;
65 mutable bool m_running;
67 TaskPointerAndCallbackQueue(boost::intrusive_ptr<Task>&& task, std::function<
void(
bool)>&& callback) :
68 m_task(std::move(task)), m_finished(
false), m_success(
false), m_running(
false) { m_callbacks.push(NEW(CallbackNode(std::move(callback)))); }
71 void print_on(std::ostream& os)
const
73 os <<
'"' << libcwd::type_info_of<Task>().demangled_name() <<
'"';
77 using unordered_map_type = std::unordered_map<
78 statefultask::BrokerKey::unique_ptr,
79 TaskPointerAndCallbackQueue,
82 using map_type = aithreadsafe::Wrapper<unordered_map_type, aithreadsafe::policy::ReadWrite<AIReadWriteMutex>>;
86 utils::threading::Gate m_finished;
87 std::tuple<CWDEBUG_ONLY(
bool,) Args...> m_debugflag_task_args;
90 ~Broker()
override { DoutEntering(dc::broker(mSMDebug),
"~Broker() [" << (
void*)
this <<
"]"); }
97 Broker(CWDEBUG_ONLY(
bool debug,) Args... task_args) :
98 AIStatefulTask(CWDEBUG_ONLY(debug)), m_is_immediate(
false), m_debugflag_task_args(CWDEBUG_ONLY(debug,) task_args...)
100 DoutEntering(dc::broker(mSMDebug),
"Broker<" <<
101 libcwd::type_info_of<Task>().demangled_name() <<
102 ((LibcwDoutStream << ... << (std::string(
", ") + libcwd::type_info_of<Args>().demangled_name())),
">::Broker(") <<
103 join(
", ", task_args...) <<
") [" << (
void*)
this <<
"]");
111 m_is_immediate = handler.is_immediate();
115 Dout(dc::broker,
"task::Broker<" << libcwd::type_info_of<Task>().demangled_name() <<
"> terminated.");
127 void terminate(std::function<
void (Task*)> callback)
130 typename map_type::wat key2task_w(m_key2task);
131 for (
auto& element : *key2task_w)
132 callback(element.second.m_task.get());
137 boost::intrusive_ptr<Task const> run(
statefultask::BrokerKey const& key, std::function<
void(
bool)>&& callback);
140template<TaskType Task,
typename... Args>
143 DoutEntering(dc::broker,
"Broker<" << libcwd::type_info_of<Task>().demangled_name() <<
", void>::run(" << key <<
", callback)");
149 typename unordered_map_type::mapped_type
const* entry;
157 typename map_type::rat key2task_r(m_key2task);
162 if ((task_created = search == key2task_r->end()))
166 typename map_type::wat key2task_w(key2task_r);
169 boost::intrusive_ptr<Task> task = std::apply([](
auto&&... args){
return statefultask::create<Task>(std::forward<
decltype(args)>(args)...); }, m_debugflag_task_args);
170 entry = &key2task_w->try_emplace(key.copy(), std::move(task), std::move(callback)).first->second;
177 entry = &search->second;
181 catch (std::exception
const&)
185 m_key2task.rd2wryield();
193 key.initialize(entry->m_task);
195 Dout(dc::broker,
"Wake up Broker to run the newly created task.");
200 bool finished = entry->m_finished.load(std::memory_order_acquire);
201 Dout(dc::broker(finished),
"This task already finished.");
202 if (finished && m_is_immediate)
203 callback(entry->m_success);
206 Dout(dc::broker,
"Adding callback to the queue and wake up the Broker task.");
208 entry->m_callbacks.push(NEW(CallbackNode(std::move(callback))));
212 return entry->m_task;
215template<TaskType Task,
typename... Args>
220 AI_CASE_RETURN(Broker_start);
221 AI_CASE_RETURN(Broker_do_work);
226template<TaskType Task,
typename... Args>
232template<TaskType Task,
typename... Args>
238 set_state(Broker_do_work);
239 Dout(dc::broker,
"Waiting for initial task creation...");
247 typename map_type::rat key2task_r(m_key2task);
249 int size = key2task_r->size();
250 Dout(dc::broker(mSMDebug), ((size == 1) ?
"There is " :
"There are ") << size <<
" registered task" << ((size == 1) ?
"." :
"s."));
252 for (
typename unordered_map_type::const_iterator it = key2task_r->begin(); it != key2task_r->end(); ++it)
254 statefultask::BrokerKey::unique_ptr
const& key{it->first};
255 TaskPointerAndCallbackQueue
const& entry{it->second};
256 Dout(dc::broker(mSMDebug)|continued_cf,
"Processing entry " << entry <<
" [" << *key <<
"]; ");
257 if (!entry.m_running)
259 entry.m_running =
true;
260 Dout(dc::broker(mSMDebug),
"The task of this entry wasn't started yet. Calling run() now:");
262 entry.m_task->run(
this, 1, signal_parent);
263 Dout(dc::finish,
"returned from run().");
265 else if (entry.m_task->finished())
267 entry.m_success = !entry.m_task->aborted();
268 entry.m_finished.store(
true, std::memory_order_release);
272 while ((head =
static_cast<CallbackNode*
>(entry.m_callbacks.pop())))
274 CallbackNode* node =
static_cast<CallbackNode*
>(head);
275 node->m_callback(entry.m_success);
278 Dout(dc::finish,
"callback queue cleared.");
281 Dout(dc::finish,
"skipping: not finished.");
284 Dout(dc::broker,
"Waiting for more work...");
291template<TaskType Task,
typename... Args>
294 DoutEntering(dc::broker(mSMDebug),
"Broker<"<< libcwd::type_info_of<Task>().demangled_name() <<
">::abort_impl()");
295 typename map_type::rat key2task_r(m_key2task);
296 for (
typename unordered_map_type::const_iterator it = key2task_r->begin(); it != key2task_r->end(); ++it)
298 TaskPointerAndCallbackQueue
const& entry{it->second};
299 entry.m_task->abort();
300 utils::threading::MpscNode* head;
301 while ((head = entry.m_callbacks.pop()))
303 CallbackNode* node =
static_cast<CallbackNode*
>(head);
Declaration of base class AIStatefulTask.
Definition: AIStatefulTask.h:96
static constexpr state_type state_end
The next state value to use for derived classes.
Definition: AIStatefulTask.h:137
AIStatefulTask(bool debug)
Definition: AIStatefulTask.h:352
uint32_t state_type
The type of run_state.
Definition: AIStatefulTask.h:98
Definition: BrokerKey.h:17
void multiplex_impl(state_type run_state) override
Called for base state bs_multiplex.
Definition: Broker.h:233
char const * state_str_impl(state_type run_state) const override
Called to stringify a run state for debugging output. Must be overridden.
Definition: Broker.h:216
char const * task_name_impl() const override
This can be used to get a human readable name of the most-derived class. It must be guaranteed to alw...
Definition: Broker.h:227
void abort_impl() override
Called for base state bs_abort.
Definition: Broker.h:292
void abort()
Definition: AIStatefulTask.cxx:1690
void run(Handler default_handler, std::function< void(bool)> cb_function)
Definition: AIStatefulTask.cxx:1192
Definition: AIStatefulTask.h:164
@ immediate
Construct an immediate Handler.
Definition: AIStatefulTask.h:175
Definition: BrokerKey.h:48
Definition: BrokerKey.h:43