General Utility Library for C++17 25.4.1
ThreadPool.h
Go to the documentation of this file.
1
23// SPDX-License-Identifier: LGPL-2.1-or-later
24
25#ifndef GUL17_THREADPOOL_H_
26#define GUL17_THREADPOOL_H_
27
28#include <chrono>
29#include <condition_variable>
30#include <functional>
31#include <future>
32#include <limits>
33#include <memory>
34#include <mutex>
35#include <stdexcept>
36#include <thread>
37#include <vector>
38
39#include <gul17/cat.h>
40#include <gul17/traits.h>
41
42namespace gul17 {
43
44class ThreadPool;
45
46namespace detail {
47
48GUL_EXPORT
49std::shared_ptr<ThreadPool> lock_pool_or_throw(std::weak_ptr<ThreadPool> pool);
50
51} // namespace detail
52
65enum class TaskState
66{
67 pending,
68 running,
69 complete,
71};
72
110class ThreadPool : public std::enable_shared_from_this<ThreadPool>
111{
112public:
114 using TaskId = std::uint64_t;
115
117 using ThreadId = std::vector<std::thread>::size_type;
118
135 template <typename T>
137 {
138 public:
146 {}
147
160 TaskHandle(TaskId id, std::future<T> future, std::shared_ptr<ThreadPool> pool)
161 : future_{ std::move(future) }
162 , id_{ id }
163 , pool_{ std::move(pool) }
164 {}
165
177 bool cancel()
178 {
179 future_ = {};
180 return detail::lock_pool_or_throw(pool_)->cancel_pending_task(id_);
181 }
182
190 {
191 if (not future_.valid())
192 throw std::logic_error("Canceled task has no result");
193 return future_.get();
194 }
195
208 bool is_complete() const
209 {
210 if (not future_.valid())
211 return false;
212
213 return future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
214 }
215
229 {
230 const auto state = detail::lock_pool_or_throw(pool_)->get_task_state(id_);
231
232 if (state == InternalTaskState::unknown)
233 {
234 if (is_complete())
235 return TaskState::complete;
236 else
237 return TaskState::canceled;
238 }
239 return static_cast<TaskState>(state);
240 }
241
242 private:
243 std::future<T> future_;
244 TaskId id_{ 0 };
245 std::weak_ptr<ThreadPool> pool_;
246 };
247
248
249 using TimePoint = std::chrono::time_point<std::chrono::system_clock>;
250 using Duration = TimePoint::duration;
251
253 constexpr static std::size_t default_capacity{ 200 };
254
256 constexpr static std::size_t max_capacity{ 10'000'000 };
257
259 constexpr static std::size_t max_threads{ 10'000 };
260
268 ~ThreadPool();
269
307 template <typename Function>
309 add_task(Function fct, TimePoint start_time = {}, std::string name = {})
310 {
311 static_assert(
312 std::is_invocable<Function, ThreadPool&>::value
313 || std::is_invocable<Function>::value,
314 "Invalid function signature: Must be T fct() or T fct(ThreadPool&)");
315
316 using Result = std::invoke_result_t<Function, ThreadPool&>;
317
319 {
320 std::lock_guard<std::mutex> lock(mutex_);
321
322 if (is_full_i())
323 {
324 throw std::runtime_error(cat(
325 "Cannot add task: Pending queue has reached capacity (",
326 pending_tasks_.size(), ')'));
327 }
328
329 using PackagedTask = std::packaged_task<Result(ThreadPool&)>;
330
331 auto named_task_ptr = std::make_unique<NamedTaskImpl<PackagedTask>>(
332 PackagedTask{ std::move(fct) }, std::move(name));
333
335 next_task_id_, named_task_ptr->fct_.get_future(), shared_from_this() };
336
337 pending_tasks_.emplace_back(
338 next_task_id_, std::move(named_task_ptr), start_time);
339
340 ++next_task_id_;
341
342 return handle;
343 }();
344
345 cv_.notify_one();
346
347 return task_handle;
348 }
349
350 template <typename Function,
351 std::enable_if_t<std::is_invocable<Function>::value, bool> = true>
353 add_task(Function fct, TimePoint start_time = {}, std::string name = {})
354 {
355 return add_task(
356 [f = std::move(fct)](ThreadPool&) mutable { return f(); },
357 start_time, std::move(name));
358 }
359
360 template <typename Function,
361 std::enable_if_t<std::is_invocable<Function, ThreadPool&>::value, bool> = true>
363 add_task(Function fct, Duration delay_before_start, std::string name = {})
364 {
365 return add_task(std::move(fct),
366 std::chrono::system_clock::now() + delay_before_start, std::move(name));
367 }
368
369 template <typename Function,
370 std::enable_if_t<std::is_invocable<Function>::value, bool> = true>
372 add_task(Function fct, Duration delay_before_start, std::string name = {})
373 {
374 return add_task(std::move(fct),
375 std::chrono::system_clock::now() + delay_before_start, std::move(name));
376 }
377
378 template <typename Function,
379 std::enable_if_t<std::is_invocable<Function, ThreadPool&>::value, bool> = true>
381 add_task(Function fct, std::string name)
382 {
383 return add_task(std::move(fct), TimePoint{}, std::move(name));
384 }
385
386 template <typename Function,
387 std::enable_if_t<std::is_invocable<Function>::value, bool> = true>
389 add_task(Function fct, std::string name)
390 {
391 return add_task(std::move(fct), TimePoint{}, std::move(name));
392 }
393
402 GUL_EXPORT
403 std::size_t cancel_pending_tasks();
404
406 GUL_EXPORT
407 std::size_t capacity() const noexcept { return capacity_; }
408
410 GUL_EXPORT
411 std::size_t count_pending() const;
412
414 GUL_EXPORT
415 std::size_t count_threads() const noexcept;
416
418 GUL_EXPORT
419 std::vector<std::string> get_pending_task_names() const;
420
422 GUL_EXPORT
423 std::vector<std::string> get_running_task_names() const;
424
435 GUL_EXPORT
437
439 GUL_EXPORT
440 bool is_full() const noexcept;
441
446 GUL_EXPORT
447 bool is_idle() const;
448
450 GUL_EXPORT
452
462 GUL_EXPORT
464 std::size_t num_threads, std::size_t capacity = default_capacity);
465
466private:
474 enum class InternalTaskState
475 {
477 pending = static_cast<int>(TaskState::pending),
479 running = static_cast<int>(TaskState::running),
481 unknown
482 };
483
484 struct NamedTask
485 {
486 NamedTask(std::string name)
487 : name_{ std::move(name) }
488 {}
489
490 virtual ~NamedTask() = default;
491 virtual void operator()(ThreadPool& pool) = 0;
492
493 std::string name_;
494 };
495
496 template <typename FunctionType>
497 struct NamedTaskImpl : public NamedTask
498 {
499 public:
500 NamedTaskImpl(FunctionType fct, std::string name)
501 : NamedTask{ std::move(name) }
502 , fct_{ std::move(fct) }
503 {}
504
505 void operator()(ThreadPool& pool) override { fct_(pool); }
506
507 FunctionType fct_;
508 };
509
510 struct Task
511 {
512 TaskId id_{};
513 std::unique_ptr<NamedTask> named_task_;
514 TimePoint start_time_{}; // When the task is to be started (at least no earlier)
515
516 Task() = default;
517
518 Task(TaskId task_id, std::unique_ptr<NamedTask> named_task, TimePoint start_time)
519 : id_{ task_id }
520 , named_task_{ std::move(named_task) }
521 , start_time_{ start_time }
522 {}
523 };
524
525 std::size_t capacity_{ 0 };
526
531 std::vector<std::thread> threads_;
532
538 thread_local static ThreadId thread_id_;
539
544 std::condition_variable cv_;
545
546 mutable std::mutex mutex_; // Protects the following variables
547 std::vector<Task> pending_tasks_;
548 std::vector<TaskId> running_task_ids_;
549 std::vector<std::string> running_task_names_;
550 TaskId next_task_id_ = 0;
551 bool shutdown_requested_{ false };
552
553
571 ThreadPool(std::size_t num_threads, std::size_t capacity);
572
585 GUL_EXPORT
586 bool cancel_pending_task(TaskId task_id);
587
597 GUL_EXPORT
598 InternalTaskState get_task_state(TaskId task_id) const;
599
604 GUL_EXPORT
605 bool is_full_i() const noexcept;
606
613 void perform_work(std::size_t thread_index);
614};
615
630
632
633} // namespace gul17
634
635#endif // GUL17_THREADPOOL_H_
Declaration of the overload set for cat() and of the associated class ConvertingStringView.
A handle for a task that has (or had) been enqueued on a ThreadPool.
Definition ThreadPool.h:137
T get_result()
Block until the task has finished and return its result.
Definition ThreadPool.h:189
TaskHandle(TaskId id, std::future< T > future, std::shared_ptr< ThreadPool > pool)
Construct a TaskHandle.
Definition ThreadPool.h:160
bool cancel()
Remove the task from the queue if it is still pending.
Definition ThreadPool.h:177
TaskHandle()
Default-construct an invalid TaskHandle.
Definition ThreadPool.h:145
TaskState get_state() const
Determine if the task is running, waiting to be started, completed, or has been canceled.
Definition ThreadPool.h:228
bool is_complete() const
Determine whether the task has completed.
Definition ThreadPool.h:208
A pool of worker threads with a task queue.
Definition ThreadPool.h:111
~ThreadPool()
Destruct the ThreadPool and join all threads.
Definition ThreadPool.cc:69
std::vector< std::thread >::size_type ThreadId
A unique identifier for a thread in the pool in the range of [0, count_threads()).
Definition ThreadPool.h:117
static constexpr std::size_t default_capacity
Default capacity for the task queue.
Definition ThreadPool.h:253
GUL_EXPORT std::size_t count_threads() const noexcept
Return the number of threads in the pool.
Definition ThreadPool.cc:114
TaskHandle< std::invoke_result_t< Function, ThreadPool & > > add_task(Function fct, TimePoint start_time={}, std::string name={})
Enqueue a task.
Definition ThreadPool.h:309
GUL_EXPORT bool is_shutdown_requested() const
Determine whether the thread pool has been requested to shut down.
Definition ThreadPool.cc:181
GUL_EXPORT std::vector< std::string > get_running_task_names() const
Return a vector with the names of the tasks that are currently running.
Definition ThreadPool.cc:132
static constexpr std::size_t max_capacity
Maximum possible capacity for the task queue.
Definition ThreadPool.h:256
GUL_EXPORT bool is_idle() const
Return true if the pool has neither pending tasks nor tasks that are currently being executed.
Definition ThreadPool.cc:175
GUL_EXPORT bool is_full() const noexcept
Determine whether the queue for pending tasks is full (at capacity).
Definition ThreadPool.cc:164
static GUL_EXPORT std::shared_ptr< ThreadPool > make_shared(std::size_t num_threads, std::size_t capacity=default_capacity)
Create a thread pool with the desired number of threads and the specified capacity for enqueuing task...
Definition ThreadPool.cc:187
GUL_EXPORT std::vector< std::string > get_pending_task_names() const
Return a vector with the names of the tasks that are waiting to be executed.
Definition ThreadPool.cc:119
GUL_EXPORT ThreadId get_thread_id() const
Return the thread pool ID of the current thread.
Definition ThreadPool.cc:156
std::uint64_t TaskId
A unique identifier for a task.
Definition ThreadPool.h:114
GUL_EXPORT std::size_t count_pending() const
Return the number of pending tasks.
Definition ThreadPool.cc:108
GUL_EXPORT std::size_t cancel_pending_tasks()
Remove all pending tasks from the queue.
Definition ThreadPool.cc:98
GUL_EXPORT std::size_t capacity() const noexcept
Return the maximum number of pending tasks that can be queued.
Definition ThreadPool.h:407
static constexpr std::size_t max_threads
Maximum possible number of threads.
Definition ThreadPool.h:259
std::shared_ptr< ThreadPool > make_thread_pool(std::size_t num_threads, std::size_t capacity=ThreadPool::default_capacity)
Create a thread pool with the desired number of threads and the specified capacity for queuing tasks.
Definition ThreadPool.h:625
TaskState
An enum describing the state of an individual task.
Definition ThreadPool.h:66
@ running
The task is currently being executed.
@ pending
The task is waiting to be started.
@ canceled
The task was removed from the queue before it was started.
@ complete
The task has finished (successfully or by throwing an exception).
auto constexpr bit_set(unsigned bit) noexcept -> ReturnT
Set a bit in an integral type.
Definition bit_manip.h:121
std::string cat()
Efficiently concatenate an arbitrary number of strings and numbers.
Definition cat.h:97
Namespace gul17 contains all functions and classes of the General Utility Library.
Definition doxygen.h:26
Some metaprogramming traits for the General Utility Library.