AIStatefulTask ‐ Asynchronous, Stateful Task Scheduler library.

Threads-like task objects evolving through user-defined states.

ResourcePool.h
1#pragma once
2
4#include <type_traits>
5#include <deque>
6#include <array>
7#include <vector>
8#include <map>
9
10namespace statefultask {
11
12// API to allocate batches of resources of type ResourceType.
13//
14// Provided an array with N ResourceType's, it is filled with new ResourceType values.
15//
16// An array with ResourceType's, in any order and from any batch, can be freed;
17// each ResourceType in this regard is treated as an individual object.
18//
20{
21 // ptr_to_array_of_resources must point to an array of `size` resources,
22 // the type of which is defined by the derived class.
23 virtual void do_allocate(void* ptr_to_array_of_resources, size_t size) = 0;
24 virtual void do_free(void const* ptr_to_array_of_resources, size_t size) = 0;
25
26 public:
27 template<typename ResourceType, size_t size>
28 [[gnu::always_inline]] void allocate(std::array<ResourceType, size>& resources_out)
29 {
30 do_allocate(resources_out.data(), resources_out.size());
31 }
32
33 template<typename ResourceType>
34 [[gnu::always_inline]] void allocate(std::vector<ResourceType>& resources_out)
35 {
36 do_allocate(resources_out.data(), resources_out.size());
37 }
38
39 template<typename ResourceType, size_t size>
40 [[gnu::always_inline]] void free(std::array<ResourceType, size> const& resources)
41 {
42 do_free(resources.data(), resources.size());
43 }
44
45 template<typename ResourceType>
46 [[gnu::always_inline]] void free(std::vector<ResourceType> const& resources)
47 {
48 do_free(resources.data(), resources.size());
49 }
50};
51
52template<typename T>
53concept ConceptResourceFactory = std::is_base_of_v<ResourceFactory, T> && std::is_constructible_v<typename T::resource_type>;
54
55// A pool class is deliberately not thread-safe.
56// Only a single thread at a time should call any of it's member functions.
57template<ConceptResourceFactory RF>
59{
60 public:
61 using resource_factory_type = RF;
62 using resource_type = typename resource_factory_type::resource_type;
63 using free_list_type = std::deque<resource_type, utils::DequeAllocator<resource_type>>;
64
66 {
67 AIStatefulTask* m_task; // Call m_task->signal(m_condition) when m_number_of_needed_resources are available.
68 int m_number_of_needed_resources;
70
71 EventRequest(AIStatefulTask* task, int number_of_needed_resources, AIStatefulTask::condition_type condition) :
72 m_task(task), m_number_of_needed_resources(number_of_needed_resources), m_condition(condition) { }
73 };
74 using event_requests_container_type = std::vector<EventRequest>;
75 using event_requests_type = aithreadsafe::Wrapper<event_requests_container_type, aithreadsafe::policy::Primitive<std::mutex>>;
76
77 private:
78 size_t m_max_allocations; // A limit on the allowed number of allocations.
79 size_t m_allocations; // The current number of allocations.
80 size_t m_acquires; // The current number of acquired resources that weren't released yet.
81 resource_factory_type m_factory; // Factory to create and destroy resources.
82 typename free_list_type::allocator_type& m_allocator_ref; // A reference to the allocator used for m_free_list.
83 free_list_type m_free_list; // A FILO queue for released resources.
84 event_requests_type m_event_requests; // A list of tasks that want to be woken up when new resources are available.
85
86 public:
87 // The deque allocator is kept outside of the ResourcePool class so that it can be shared with other objects (it is thread-safe).
88 // Any utils::DequeAllocator that allocates objects with a size equal to the size of resource_type can be used, provided it
89 // has a lifetime that exceeds that of the ResourcePool.
90 template<typename T, typename... Args>
91 ResourcePool(size_t max_allocations, utils::DequeAllocator<T>& allocator, Args const&... factory_args) :
92 m_max_allocations(max_allocations), m_allocations(0), m_acquires(0), m_factory(factory_args...), m_allocator_ref(allocator), m_free_list(allocator)
93 {
94 static_assert(sizeof(T) == sizeof(resource_type), "The allocation passed must allocate chunks of the right size.");
95 }
96
97 // Accessor.
98 resource_factory_type const& factory() const { return m_factory; }
99
100 // Acquire resources; either from the pool or by allocating more resources.
101 // Returns the number of actually acquired resources. It can be less than size when m_max_allocations is reached.
102 [[nodiscard]] size_t acquire(resource_type* resources, size_t const size);
103
104 // Add resources to the pool (m_free_list).
105 void release(resource_type const* resources, size_t size);
106
107 template<size_t size>
108 [[nodiscard, gnu::always_inline]] size_t acquire(std::array<resource_type, size>& resources_out)
109 {
110 return acquire(resources_out.data(), resources_out.size());
111 }
112
113 [[nodiscard, gnu::always_inline]] size_t acquire(std::vector<resource_type>& resources_out)
114 {
115 return acquire(resources_out.data(), resources_out.size());
116 }
117
118 template<size_t size>
119 [[gnu::always_inline]] void release(std::array<resource_type, size> const& resources)
120 {
121 release(resources.data(), resources.size());
122 }
123
124 [[gnu::always_inline]] void release(std::vector<resource_type> const& resources)
125 {
126 release(resources.data(), resources.size());
127 }
128
129 // Register a task to be notified when more resources are returned to the pool.
130 // This will call task->signal(condition) when at least n resources can be allocated.
131 // It is preferable that task operates in immediate-mode so that new resources are
132 // handed out in the order that subscribe was called; but this also means that
133 // the thread that calls release will continue to run task. It might therefore be
134 // necessary to switch task to immediate mode when waiting for condition, and out
135 // of it immediately after acquiring the resources.
136 void subscribe(int n, AIStatefulTask* task, AIStatefulTask::condition_type condition);
137};
138
139template<ConceptResourceFactory RF>
140size_t ResourcePool<RF>::acquire(resource_type* resources, size_t const size)
141{
142 DoutEntering(dc::notice|continued_cf, "ResourcePool<" << type_info_of<RF>().demangled_name() << ">::acquire(resources (" << resources << "), " << size << ") = ");
143 // The number of already allocated, free resources.
144 size_t const in_pool = m_free_list.size();
145 // The index into resources[] that must be filled next.
146 size_t index = 0;
147 // First get resources from the pool, if any.
148 if (index < in_pool)
149 {
150 auto resource = m_free_list.begin();
151 // Get size resources from the free list, or however many are in there.
152 while (index < std::min(in_pool, size))
153 {
154 Dout(dc::notice, "resources[" << index << "] = " << *resource << " (from m_free_list)");
155 resources[index++] = *resource++;
156 }
157 // Erase the used resources from the free list.
158 m_free_list.erase(m_free_list.begin(), resource);
159 }
160 // Get the remaining resources from m_factory, if any.
161 if (index < size)
162 {
163 // We are not allowed to allocate more than m_max_allocations resources in total.
164 // This line allows for m_allocations to be larger than m_max_allocations (causing to_allocate
165 // to become zero) in case of future dynamic adjustments of m_max_allocations.
166 size_t to_allocate = std::min(size - index, std::max(m_max_allocations, m_allocations) - m_allocations);
167 if (to_allocate > 0)
168 {
169 m_factory.do_allocate(&resources[index], to_allocate);
170#ifdef CWDEBUG
171 for (int j = 0; j < to_allocate; ++j)
172 Dout(dc::notice, "resources[" << (index + j) << "] = " << resources[index + j] << " (from m_factory)");
173#endif
174 index += to_allocate;
175 m_allocations += to_allocate;
176 }
177 }
178 // Return the number of actually acquired resources.
179 Dout(dc::finish, index);
180 return index;
181}
182
183template<ConceptResourceFactory RF>
184void ResourcePool<RF>::release(resource_type const* resources, size_t size)
185{
186 DoutEntering(dc::notice, "ResourcePool<" << type_info_of<RF>().demangled_name() << ">::release(resources (" << resources << "), " << size << ")");
187 // In case of future dynamic adjustments of m_max_allocations.
188 if (AI_UNLIKELY(m_allocations > m_max_allocations))
189 {
190 // Free some or all of the resources immediately.
191 size_t to_free = std::min(size, m_allocations - m_max_allocations);
192 if (to_free > 0) // Might be zero when size is zero for some reason. No need to assert in that case.
193 {
194#ifdef CWDEBUG
195 for (int j = 0; j < to_free; ++j)
196 Dout(dc::notice, resources[j] << " is returned to m_factory.");
197#endif
198 m_factory.do_free(resources, to_free);
199 }
200 m_allocations -= to_free;
201 size -= to_free;
202 resources += to_free;
203 }
204 // Put `size` resources on the free list.
205 size_t index = 0;
206 while (index < size)
207 {
208 Dout(dc::notice, resources[index] << " put back on m_free_list.");
209 m_free_list.push_back(resources[index++]);
210 }
211 // Notify waiting tasks.
212 std::vector<EventRequest> must_be_notified;
213 {
214 typename event_requests_type::wat event_requests_w(m_event_requests);
215 if (AI_LIKELY(event_requests_w->empty()))
216 return;
217 size_t available_resources = m_max_allocations - m_allocations + m_free_list.size();
218 // Copy elements from event_request to must_be_notified until we ran out of available_resources.
219 typename event_requests_container_type::iterator event_request = event_requests_w->begin();
220 while (event_request != event_requests_w->end())
221 {
222 if (event_request->m_number_of_needed_resources > available_resources)
223 break;
224 must_be_notified.push_back(*event_request);
225 available_resources -= event_request->m_number_of_needed_resources;
226 ++event_request;
227 }
228 // Remove the copied elements.
229 event_requests_w->erase(event_requests_w->begin(), event_request);
230 }
231 // While m_event_requests is unlocked, notify the lucky tasks.
232 for (auto const& event_request : must_be_notified)
233 event_request.m_task->signal(event_request.m_condition);
234}
235
236template<ConceptResourceFactory RF>
237void ResourcePool<RF>::subscribe(int n, AIStatefulTask* task, AIStatefulTask::condition_type condition)
238{
239 typename event_requests_type::wat event_requests_w(m_event_requests);
240 typename event_requests_container_type::iterator iter = event_requests_w->begin();
241 // The same task can call this "spuriously". Lets just remember the largest n (although if that happens n should be the same).
242 while (iter != event_requests_w->end())
243 if (iter->m_task == task)
244 {
245 if (AI_UNLIKELY(n > iter->m_number_of_needed_resources))
246 iter->m_number_of_needed_resources = n;
247 return;
248 }
249 event_requests_w->emplace_back(task, n, condition);
250}
251
252} // namespace statefultask
Declaration of base class AIStatefulTask.
Definition: AIStatefulTask.h:96
uint32_t condition_type
The type of the skip_wait and idle bit masks.
Definition: AIStatefulTask.h:99
Definition: ResourcePool.h:20
Definition: ResourcePool.h:59
Tasks defined by the library project are put into this namespace.
Definition: AIStatefulTask.h:857
Definition: ResourcePool.h:66