General Utility Library for C++17 26.5.0
ThreadPool.h
Go to the documentation of this file.
1
26#ifndef GUL17_THREADPOOL_H_
27#define GUL17_THREADPOOL_H_
28
29#include <chrono>
30#include <condition_variable>
31#include <functional>
32#include <future>
33#include <limits>
34#include <memory>
35#include <mutex>
36#include <stdexcept>
37#include <thread>
38#include <vector>
39
40#include <gul17/cat.h>
41#include <gul17/traits.h>
42
43namespace gul17 {
44
45class ThreadPool;
46
47namespace detail {
48
49GUL_EXPORT
50std::shared_ptr<ThreadPool> lock_pool_or_throw(std::weak_ptr<ThreadPool> pool);
51
52} // namespace detail
53
66enum class TaskState
67{
68 pending,
69 running,
70 complete,
72};
73
111class ThreadPool : public std::enable_shared_from_this<ThreadPool>
112{
113public:
115 using TaskId = std::uint64_t;
116
118 using ThreadId = std::vector<std::thread>::size_type;
119
136 template <typename T>
138 {
139 public:
147 {}
148
161 TaskHandle(TaskId id, std::future<T> future, std::shared_ptr<ThreadPool> pool)
162 : future_{ std::move(future) }
163 , id_{ id }
164 , pool_{ std::move(pool) }
165 {}
166
178 bool cancel()
179 {
180 future_ = {};
181 return detail::lock_pool_or_throw(pool_)->cancel_pending_task(id_);
182 }
183
191 {
192 if (not future_.valid())
193 throw std::logic_error("Canceled task has no result");
194 return future_.get();
195 }
196
209 bool is_complete() const
210 {
211 if (not future_.valid())
212 return false;
213
214 return future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
215 }
216
230 {
231 const auto state = detail::lock_pool_or_throw(pool_)->get_task_state(id_);
232
233 if (state == InternalTaskState::unknown)
234 {
235 if (is_complete())
236 return TaskState::complete;
237 else
238 return TaskState::canceled;
239 }
240 return static_cast<TaskState>(state);
241 }
242
243 private:
244 std::future<T> future_;
245 TaskId id_{ 0 };
246 std::weak_ptr<ThreadPool> pool_;
247 };
248
249
250 using TimePoint = std::chrono::time_point<std::chrono::system_clock>;
251 using Duration = TimePoint::duration;
252
254 constexpr static std::size_t default_capacity{ 200 };
255
257 constexpr static std::size_t max_capacity{ 10'000'000 };
258
260 constexpr static std::size_t max_threads{ 10'000 };
261
269 ~ThreadPool();
270
308 template <typename Function>
310 add_task(Function fct, TimePoint start_time = {}, std::string name = {})
311 {
312 static_assert(
313 std::is_invocable<Function, ThreadPool&>::value
314 || std::is_invocable<Function>::value,
315 "Invalid function signature: Must be T fct() or T fct(ThreadPool&)");
316
317 using Result = std::invoke_result_t<Function, ThreadPool&>;
318
320 {
321 std::lock_guard<std::mutex> lock(mutex_);
322
323 if (is_full_i())
324 {
325 throw std::runtime_error(cat(
326 "Cannot add task: Pending queue has reached capacity (",
327 pending_tasks_.size(), ')'));
328 }
329
330 using PackagedTask = std::packaged_task<Result(ThreadPool&)>;
331
332 auto named_task_ptr = std::make_unique<NamedTaskImpl<PackagedTask>>(
333 PackagedTask{ std::move(fct) }, std::move(name));
334
336 next_task_id_, named_task_ptr->fct_.get_future(), shared_from_this() };
337
338 pending_tasks_.emplace_back(
339 next_task_id_, std::move(named_task_ptr), start_time);
340
341 ++next_task_id_;
342
343 return handle;
344 }();
345
346 cv_.notify_one();
347
348 return task_handle;
349 }
350
351 template <typename Function,
352 std::enable_if_t<std::is_invocable<Function>::value, bool> = true>
354 add_task(Function fct, TimePoint start_time = {}, std::string name = {})
355 {
356 return add_task(
357 [f = std::move(fct)](ThreadPool&) mutable { return f(); },
358 start_time, std::move(name));
359 }
360
361 template <typename Function,
362 std::enable_if_t<std::is_invocable<Function, ThreadPool&>::value, bool> = true>
364 add_task(Function fct, Duration delay_before_start, std::string name = {})
365 {
366 return add_task(std::move(fct),
367 std::chrono::system_clock::now() + delay_before_start, std::move(name));
368 }
369
370 template <typename Function,
371 std::enable_if_t<std::is_invocable<Function>::value, bool> = true>
373 add_task(Function fct, Duration delay_before_start, std::string name = {})
374 {
375 return add_task(std::move(fct),
376 std::chrono::system_clock::now() + delay_before_start, std::move(name));
377 }
378
379 template <typename Function,
380 std::enable_if_t<std::is_invocable<Function, ThreadPool&>::value, bool> = true>
382 add_task(Function fct, std::string name)
383 {
384 return add_task(std::move(fct), TimePoint{}, std::move(name));
385 }
386
387 template <typename Function,
388 std::enable_if_t<std::is_invocable<Function>::value, bool> = true>
390 add_task(Function fct, std::string name)
391 {
392 return add_task(std::move(fct), TimePoint{}, std::move(name));
393 }
394
403 GUL_EXPORT
404 std::size_t cancel_pending_tasks();
405
407 GUL_EXPORT
408 std::size_t capacity() const noexcept { return capacity_; }
409
411 GUL_EXPORT
412 std::size_t count_pending() const;
413
415 GUL_EXPORT
416 std::size_t count_threads() const noexcept;
417
419 GUL_EXPORT
420 std::vector<std::string> get_pending_task_names() const;
421
423 GUL_EXPORT
424 std::vector<std::string> get_running_task_names() const;
425
436 GUL_EXPORT
438
440 GUL_EXPORT
441 bool is_full() const noexcept;
442
447 GUL_EXPORT
448 bool is_idle() const;
449
451 GUL_EXPORT
453
463 GUL_EXPORT
465 std::size_t num_threads, std::size_t capacity = default_capacity);
466
467private:
475 enum class InternalTaskState
476 {
478 pending = static_cast<int>(TaskState::pending),
480 running = static_cast<int>(TaskState::running),
482 unknown
483 };
484
485 struct NamedTask
486 {
487 NamedTask(std::string name)
488 : name_{ std::move(name) }
489 {}
490
491 virtual ~NamedTask() = default;
492 virtual void operator()(ThreadPool& pool) = 0;
493
494 std::string name_;
495 };
496
497 template <typename FunctionType>
498 struct NamedTaskImpl : public NamedTask
499 {
500 public:
501 NamedTaskImpl(FunctionType fct, std::string name)
502 : NamedTask{ std::move(name) }
503 , fct_{ std::move(fct) }
504 {}
505
506 void operator()(ThreadPool& pool) override { fct_(pool); }
507
508 FunctionType fct_;
509 };
510
511 struct Task
512 {
513 TaskId id_{};
514 std::unique_ptr<NamedTask> named_task_;
515 TimePoint start_time_{}; // When the task is to be started (at least no earlier)
516
517 Task() = default;
518
519 Task(TaskId task_id, std::unique_ptr<NamedTask> named_task, TimePoint start_time)
520 : id_{ task_id }
521 , named_task_{ std::move(named_task) }
522 , start_time_{ start_time }
523 {}
524 };
525
526 std::size_t capacity_{ 0 };
527
532 std::vector<std::thread> threads_;
533
539 thread_local static ThreadId thread_id_;
540
545 std::condition_variable cv_;
546
547 mutable std::mutex mutex_; // Protects the following variables
548 std::vector<Task> pending_tasks_;
549 std::vector<TaskId> running_task_ids_;
550 std::vector<std::string> running_task_names_;
551 TaskId next_task_id_ = 0;
552 bool shutdown_requested_{ false };
553
554
572 ThreadPool(std::size_t num_threads, std::size_t capacity);
573
586 GUL_EXPORT
587 bool cancel_pending_task(TaskId task_id);
588
598 GUL_EXPORT
599 InternalTaskState get_task_state(TaskId task_id) const;
600
605 GUL_EXPORT
606 bool is_full_i() const noexcept;
607
614 void perform_work(std::size_t thread_index);
615};
616
631
633
634} // namespace gul17
635
636#endif // GUL17_THREADPOOL_H_
Declaration of the overload set for cat() and of the associated class ConvertingStringView.
A handle for a task that has (or had) been enqueued on a ThreadPool.
Definition ThreadPool.h:138
T get_result()
Block until the task has finished and return its result.
Definition ThreadPool.h:190
TaskHandle(TaskId id, std::future< T > future, std::shared_ptr< ThreadPool > pool)
Construct a TaskHandle.
Definition ThreadPool.h:161
bool cancel()
Remove the task from the queue if it is still pending.
Definition ThreadPool.h:178
TaskHandle()
Default-construct an invalid TaskHandle.
Definition ThreadPool.h:146
TaskState get_state() const
Determine if the task is running, waiting to be started, completed, or has been canceled.
Definition ThreadPool.h:229
bool is_complete() const
Determine whether the task has completed.
Definition ThreadPool.h:209
A pool of worker threads with a task queue.
Definition ThreadPool.h:112
~ThreadPool()
Destruct the ThreadPool and join all threads.
Definition ThreadPool.cc:72
std::vector< std::thread >::size_type ThreadId
A unique identifier for a thread in the pool in the range of [0, count_threads()).
Definition ThreadPool.h:118
static constexpr std::size_t default_capacity
Default capacity for the task queue.
Definition ThreadPool.h:254
GUL_EXPORT std::size_t count_threads() const noexcept
Return the number of threads in the pool.
Definition ThreadPool.cc:117
TaskHandle< std::invoke_result_t< Function, ThreadPool & > > add_task(Function fct, TimePoint start_time={}, std::string name={})
Enqueue a task.
Definition ThreadPool.h:310
GUL_EXPORT bool is_shutdown_requested() const
Determine whether the thread pool has been requested to shut down.
Definition ThreadPool.cc:184
GUL_EXPORT std::vector< std::string > get_running_task_names() const
Return a vector with the names of the tasks that are currently running.
Definition ThreadPool.cc:135
static constexpr std::size_t max_capacity
Maximum possible capacity for the task queue.
Definition ThreadPool.h:257
GUL_EXPORT bool is_idle() const
Return true if the pool has neither pending tasks nor tasks that are currently being executed.
Definition ThreadPool.cc:178
GUL_EXPORT bool is_full() const noexcept
Determine whether the queue for pending tasks is full (at capacity).
Definition ThreadPool.cc:167
static GUL_EXPORT std::shared_ptr< ThreadPool > make_shared(std::size_t num_threads, std::size_t capacity=default_capacity)
Create a thread pool with the desired number of threads and the specified capacity for enqueuing task...
Definition ThreadPool.cc:190
GUL_EXPORT std::vector< std::string > get_pending_task_names() const
Return a vector with the names of the tasks that are waiting to be executed.
Definition ThreadPool.cc:122
GUL_EXPORT ThreadId get_thread_id() const
Return the thread pool ID of the current thread.
Definition ThreadPool.cc:159
std::uint64_t TaskId
A unique identifier for a task.
Definition ThreadPool.h:115
GUL_EXPORT std::size_t count_pending() const
Return the number of pending tasks.
Definition ThreadPool.cc:111
GUL_EXPORT std::size_t cancel_pending_tasks()
Remove all pending tasks from the queue.
Definition ThreadPool.cc:101
GUL_EXPORT std::size_t capacity() const noexcept
Return the maximum number of pending tasks that can be queued.
Definition ThreadPool.h:408
static constexpr std::size_t max_threads
Maximum possible number of threads.
Definition ThreadPool.h:260
std::shared_ptr< ThreadPool > make_thread_pool(std::size_t num_threads, std::size_t capacity=ThreadPool::default_capacity)
Create a thread pool with the desired number of threads and the specified capacity for queuing tasks.
Definition ThreadPool.h:626
TaskState
An enum describing the state of an individual task.
Definition ThreadPool.h:67
@ running
The task is currently being executed.
@ pending
The task is waiting to be started.
@ canceled
The task was removed from the queue before it was started.
@ complete
The task has finished (successfully or by throwing an exception).
auto constexpr bit_set(unsigned bit) noexcept -> ReturnT
Set a bit in an integral type.
Definition bit_manip.h:124
std::string cat()
Efficiently concatenate an arbitrary number of strings and numbers.
Definition cat.h:100
Namespace gul17 contains all functions and classes of the General Utility Library.
Definition doxygen.h:29
Some metaprogramming traits for the General Utility Library.